본문 바로가기

데이터 정합성 확보(2) - 아웃박스 패턴 구현

@정소민fan2025. 12. 8. 20:11

이 프로젝트에서는 진작에 Redis를 쓰고 있었다.

그래서 당연히 생각해야 하는 것이 분산 트랜잭션이었다. 나는 분산 트랜잭션을 구현하기 위해 아웃박스 패턴을 사용하기로 했다.

아웃박스 패턴이 무엇인지는 이전 포스팅에서 확인

 

일단 아웃박스 패턴을 적용할 flow를 살펴보자.

fun make(dto: ReservationRequest) {
    if (!getAvailableSeat(dto.date).contains(dto.seatNumber)) {
        throw RuntimeException("이미 예약되어있는 좌석입니다.")
    }
    val reservation = Reservation(
        date = dto.date,
        seatNumber = dto.seatNumber,
        status = ReservationStatus.PENDING,
        reserver = memberRepository.findById(dto.memberId)
    )
    val save = ReservationEntity.from(reservationRepository.save(reservation))
    tempReservationService.save(dto.date, save.id!!, reservation.seatNumber)
}

임시 예약을 생성하는 이 함수에 적용될 것이다.

여기서 사용되는 reservationRepository는 RDB에 예약 정보를 저장해둔다. 아직 결제가 안됐으니 status는 PENDING이다.

저장된 RDB의 id와 좌석 정보, 날짜 정보 등을 가지고 tempReservationService를 통해 Redis에 임시 예약 정보를 저장한다.

 

그런데 사실... 여기서 굳이 아웃박스 패턴이 가장 적절한가?? 하는 생각이 들긴 한다. 트랜잭션을 걸어두면 RDB에 저장이 실패해도 롤백될거고, 레디스에서 저장하다가 예외가 터져도 다 같이 롤백될거 같은데... 물론 커밋이 실패하면 이를 감지하지는 못할 것이다. 하지만 아웃박스 패턴은 메세지 브로커에 "레디스에 이 정보를 저장해라" 라는 메세지만 전달해두고 끝나는 건데 예약 좌석과 같이 즉각적인 자원 점유가 필요한 상황에서는 메세지가 전달되는 오버헤드가 있어서 적절한지는... 잘 모르겠다. 좀 더 공부해봐야겠다. 어쨌든 구현해뒀으니 리뷰하기는 해야지

 

우선 아웃박스 메세지를 작성해뒀다. 지금은 클린 아키텍처를 저장해뒀기에 도메인 모델과 엔티티를 분리해뒀다.

class OutboxMessage(
    val id: Long? = null,
    val aggregateType: AggregateType,
    val eventType: EventType,
    val payload: Map<String, Any>,
    var status: OutboxStatus,
    var processedAt: LocalDateTime? = null,
)

enum class AggregateType{
    TEMP_RESERVATION,
}

enum class EventType{
    INSERT
}

enum class OutboxStatus{
    PENDING, CANCELLED, DONE
}

@Entity
@Table(name = "outbox")
class OutboxEntity(
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(nullable = false, name = "aggregate_type")
    @Enumerated(EnumType.STRING)
    val aggregateType: AggregateType,

    @Column(nullable = false, name = "envent_type")
    @Enumerated(EnumType.STRING)
    val eventType: EventType,

    @Column(columnDefinition = "json")
    val payload: String,

    @Column(nullable = false, name = "status")
    @Enumerated(EnumType.STRING)
    val status: OutboxStatus,

    @Column(nullable = true, name = "processed_at")
    val processedAt: LocalDateTime? = null

)

 

payload는 외부 모듈에 저장할 정보

aggregateType은 무엇을 저장할 것인지, 지금은 임시 예약밖에 없으므로 enum에는 TEMP_RESERVATION하나만

eventType은 어떤 행동을 하라는 메세지인지 타입을 지정하는 것인데, 지금 임시 예약 정보는 redis에서 TTL을 가지고 있기에 DELETE가 따로 필요하지 않다. 그래서 INSERT만 만들어둠

status는 지금 이 메세지의 상태가 어떤지 확인하기 위해 만들었다.

나중에는 실패 시 재시도를 위한 필드도 있어야 할 것 같다.

interface OutboxHandler {
    fun canHandle(aggregateType: AggregateType): Boolean
    fun handle(message: OutboxMessage) : OutboxMessage
}

@Component
class TempReservationOutboxHandlerImpl(
    private val tempReservationService: TempReservationPort,
    private val outboxRepository: OutboxRepository,
    private val timeUtil: TimeUtil
) : OutboxHandler {

    private val log = KotlinLogging.logger { }

    override fun canHandle(aggregateType: AggregateType): Boolean {
        return aggregateType == AggregateType.TEMP_RESERVATION
    }

    override fun handle(message: OutboxMessage): OutboxMessage {
        val payload = message.payload
        try {
            val dateString = payload["date"] as String
            val localDate = LocalDate.parse(dateString, DateTimeFormatter.ISO_LOCAL_DATE)

            tempReservationService.save(
                localDate,
                (payload["id"] as Number).toLong(),
                payload["seatNumber"] as Int)
        } catch (e: Exception) {
            log.error { "Redis 처리 중 예외가 발생하였습니다." }
            throw OutboxException("임시 대기열 저장 중 에러 발생 : ${e.message}")
        }
        message.status = OutboxStatus.DONE
        message.processedAt = timeUtil.nowDateTime()
        return outboxRepository.save(message)
    }

}

그리고 임시 예약 전용 핸들러. 아웃박스 메세지는 임시 예약 외에 다른 기능이 추가되면 생길 수 있으므로 인터페이스를 만들어서 확장성을 고려했다. payload에서 map으로 전달받은 것을 캐스팅해서 사용하고 있는데, 이 부분에서 계속 에러가 나긴 했었다.

어떻게 제네릭을 잘 쓰면 데이터 클래스를 사용해서 해결될것 같기도한데...?

 

canHandle 함수는 이 핸들러가 핸들링해야하는 메세지를 찾기 위해 만들어졌다. 추후에 분산 트랜잭션을 사용해야할 다른 모듈을 위해 만들어두었다.

handle은 말 그대로 메세지를 가지고 핸들러가 해야 할 일을 하는 것. 임시 예약 핸들러는 tempReservationService를 통해 redis에 임시 예약을 저장하고, 아웃박스 메세지의 상태를 DONE으로 변경한다.

 

그러면 이 핸들러에게 메세지를 전달해줘야 할 메세지 브로커도 있어야 한다. 보통 RabbitMQ나 Kafka를 쓴다고 하는데, 지금은 아직 이런 메세지 브로커를 쓸 단계가 아니기도 해서 그냥 스케줄링으로 했다.

@Component
class OutboxScheduler(
    private val outboxRepository: OutboxRepository,
    private val handlers: List<OutboxHandler>
) {

    private val log = KotlinLogging.logger { }

    @Transactional
    @Scheduled(fixedRate = 1000)
    fun schedule() {
        val pendingList = outboxRepository.getPendingList()
        pendingList.forEach { outbox ->
            try {
                val handler = handlers.find { it.canHandle(outbox.aggregateType) }
                handler?.handle(outbox) ?:
                run {
                    log.error { "아웃박스 스케줄러에서 적절한 핸들러를 찾지 못했습니다." }
//                    throw OutboxException("no handler found for aggregateType = ${outbox.aggregateType}")
                }
            } catch (e: Exception) {
                log.error { "메세지 처리 중 오류 발생 : id = ${outbox.id}, message = ${e.message}" }
            }
        }
    }
}

이렇게 outboxRepository에서 PENDING 상태의 메세지를 찾아서 가져온 후, 각 여러개의 핸들러 (지금은 하나밖에 없지만) 중 적절한 핸들러를 찾아 핸들링을 맡긴다. 1000ms, 1초마다 반복된다.

 

throw를 주석처리해둔 이유는 getPendingList()를 통해 10개의 메세지를 한꺼번에 가져오는데 이 중 하나의 메세지가 예외를 일으킨다면, 그 뒤의 다른 메세지들은 핸들링되지 않고 그대로 멈추기에 없애놨다. 이 부분도 나중에 보완이 필요할거 같다. 예외가 발생하면 해당 메세지의 상태를 Retry로 만들어두고, 재시도 한계횟수도 지정해두면 좋을것 같다.

 

그럼 이제 아웃박스를 위한 기능은 다 만들었으니, 기존의 make() 함수를 바꿔주자. 간단하다.

@Transactional
fun make(dto: ReservationRequest) : Reservation {

    if (!getAvailableSeat(dto.date).contains(dto.seatNumber)) {
        throw DuplicateResourceException("이미 예약되어있는 좌석입니다.")
    }

    val reservation = Reservation(
        date = dto.date,
        seatNumber = dto.seatNumber,
        status = ReservationStatus.PENDING,
        reserver = memberRepository.findById(dto.memberId)
    )

    val save : Reservation = reservationRepository.save(reservation)

    val outboxMessage = OutboxMessage(
        aggregateType = AggregateType.TEMP_RESERVATION,
        eventType = EventType.INSERT,
        payload = TempReservationPayload(save.id!!, save.date, save.seatNumber).toMap(),
        status = OutboxStatus.PENDING
    )

    outboxRepository.save(outboxMessage)
    return save
}

이렇게 아웃박스 레포지토리에 메세지만 저장해두면 끝이다. 이 함수에서는 redis를 사용하는 코드가 전혀 없다. reservationRepository.save도 RDB outboxRepository.save도 RDB 전용이다. 따라서 둘 중 하나라도 실패한다면 다같이 롤백될 것이다.

 

이렇게 기능은 잘 구현해뒀지만 이외에도 생각해야할 부분이 많았다. 테스트나 락 같은 부분. 이 부분은 다음 포스팅에서 쓰겠다.

정소민fan
@정소민fan :: 코딩은 관성이야

코딩은 관성적으로 해야합니다 즐거운 코딩 되세요

목차