Saga framework를 만들어 봤습니다. (redis stream을 지원하는)
프로젝트에 Saga를 적용했는데, 중복되는 코드가 많아져서 유지보수가 힘들어 지더군요. 하나씩 수정하다가 너무 힘들어서 프레임워크로 만들기로 결심하고 만들어 봤습니다.
지원 하는 기능은 아래와 같습니다.
레디스 스트림 지원
reactor(webflux), blocking 지원
함수형 Orchestration과 이벤트기반 Choreograph 지원
처리되지 못한 event 자동 재실행
Transactional messaging pattern 자동 적용
backpressure 지원
At least once 메시지 전달 보장
(추가적인 서버 구성없이 라이트하게 사용하는것을 목표로 만들었습니다.)
실제 동작 gif등은 아래 링크에서 볼 수 있습니다.
https://github.com/devxb/Netx#test
피드백 주시면 정말 감사하겠습니다. 🙇
아래는 사용 예시 코드 입니다. (https://github.com/devxb/Netx#orchestrator-example <- 여기 들어가서 보시면 색깔있는버전으로 보실 수 있습니다.)
Orchestration 방식
// Orhcestrator 사용하기
@Service
class OrderService(private val orderOrchestrator: Orchestrator<Order, OrderResponse>) {
fun order(orderRequest: Order): OrderResult {
val result = orderOrchestrator.sagaSync(orderRequest)
// 성공하면 OrderResponse를 응답하고, 실패했으면 해당하는 예외를 던집니다.
result.decodeResultOrThrow(OrderResponse::class)
}
}
// Orchestrator 등록하기
@Configurer
class OrchestratorConfigurer {
// OrchestratorFactory.instance()로 Factory를 가져올 수 있습니다.
private val orchestratorFactory = OrchestratorFactory.instance()
@Bean
fun orderOrchestartor(): Orchestrator<Order, OrderResponse> { // <첫번째 요청 타입, 마지막 응답 타입>
return orchestratorFactory.create("orderOrchestrator")
.start(
orchestrate = { order -> // 첫번째 요청이 Order 였으므로, 여기서도 order가 들어옵니다.
return@start user // 여기서 User타입을 반환하면, 아래 orchestrate에 여기서 반환한 user가 전달됩니다.
}
)
.joinReactive( // ...Reactive 를 사용하면, webflux api를 사용할 수 있습니다.
orchestrate = { user ->
// ...
},
rollback = { user -> ... }
)
.joinWithContext(
// ...withContext를 사용하면, 하나의 saga요청 전체의 생명주기를 갖고있는 context를 사용할 수 있습니다.
contextOrchestrate = { context, request ->
// context에 데이터를 저장하면, 다음 orchestrate, rollback 에서 사용할 수 있습니다.
context.set("key", request)
context.decode("foo", Foo::class) // 이렇게, context에 값이 저장되어있다면, 값을 꺼내올수도 있습니다.
},
)
.commit( // 여기서 commit하면 saga생성을 종료하고 Orchestrator를 반환합니다. (바로 시작되는것이 아님)
orchestrate = { request ->
// 예외가 던져지면, 여기부터 시작해서 위로 거슬러 올라가며 rollback을 순차적으로 호출합니다.
throw IllegalArgumentException("Oops! Something went wrong..")
},
rollback = { request ->
...
}
)
}
}아래처럼 어노테이션 기반으로 사용할수도 있습니다.
@SagaHandler
class SagaHandler{
// Foo에 해당하는 Saga start event만 받습니다.
@SagaStartListener(event = Foo::class, successWith = SuccessWith.PUBLISH_JOIN)
fun handleSagaStartEvent(event: SagaStartEvent) {
event.setNextEvent(Helloworld(" ")) // successWith와 함께 publish 될 event를 지정할 수 있습니다.
}
// event가 설정되지 않으면 모든 event를 수신합니다. successWith의 상태에 따라서, Saga다음으로 pulbish할 event타입을 지정할 수 있습니다.
@SagaJoinListener(successWith = SuccessWith.PUBLISH_COMMIT)
fun handleSagaJoinEvent(event: SagaJoinEvent) ...
@SagaCommitListener(
event = Foo::class,
noRollbackFor = [IllegalArgumentException::class]
)
fun handleSagaCommitEvent(event: SagaCommitEvent): Mono<String>
@SagaRollbackListener(Foo::class)
fun handleSagaRollbackEvent(event: SagaRollbackEvent)
}