이번 과제는 Mock을 사용해서 실시간 예약정보를 데이터 플랫폼에 전송한다고 치고, 트랜잭션 내의 로직을 이벤트로 분리하는 것이다
또한 MSA 설계로 전환할 때, 어떤 문제가 예상되고 해결할 수 있을지 보고서도 써야함
과제
일단 이 다음 과제가 kafka 사용이니까, 그걸 고려해서 구현하자
현재 구현체에서는 ApplicationEventPublisher 를 사용해서 하면 될 것 같다
일단 추상화를 위한 인터페이스부터 만들자
interface DomainEventPublisher {
fun publish(event: DomainEvent)
fun publishAll(events: List<DomainEvent>)
}
abstract class DomainEvent (
val eventId: String = UUID.randomUUID().toString(),
val createdAt: Instant = Instant.now(),
val aggregateType: AggregateType,
val aggregateId: String
)각 도메인에서 발행되는 이벤트는 DomainEvent를 상속해서 발행되도록 할 것이다
추후 kafka로 변경할 때 aggregateType과 aggregateId를 사용해 kafka의 토픽으로 사용할 것이다
위 인터페이스를 구현하는 구현체를 만들자
@Component
class SpringDomainEventPublisher(
private val applicationEventPublisher: ApplicationEventPublisher,
) : DomainEventPublisher {
override fun publish(event: DomainEvent) {
applicationEventPublisher.publishEvent(event)
}
override fun publishAll(events: List<DomainEvent>) {
events.forEach { event -> applicationEventPublisher.publishEvent(event) }
}
}
ApplicationEventPublisher에서 이벤트를 발행하게 되면 등록된 리스너 리스트를 찾아서 사용하게 된다
이는 트랜잭션 이벤트 리스너가 될수도 있고, 이벤트 리스너가 될수도 있다
이벤트 발행을 시작한 곳이 트랜잭션 내부인지 아닌지에 따라 바뀐다
서비스에서 이 인터페이스의 publish를 사용하게 되면 리스너의 답변을 동기식으로 기다리게 된다
따라서 리스너가 트랜잭션에 포함될수 있다는 뜻이다
일단 이벤트를 적용할 코드를 미리 보자
fun make(dto: ReservationRequest): Reservation {
if (!seatFinder.getAvailableSeats(dto.concertId).contains(dto.seatNumber)) {
throw DuplicateResourceException("이미 예약되어있는 좌석입니다.")
}
val concert = concertRepository.findById(dto.concertId)
val reservation = Reservation(
concert = concert,
seatNumber = dto.seatNumber,
status = ReservationStatus.PENDING,
reserver = memberRepository.findById(dto.memberId)
)
val saved: Reservation = try {
reservationRepository.save(reservation)
} catch (e: DataIntegrityViolationException) {
throw DuplicateResourceException("이미 예약되어있는 좌석입니다.")
}
val outboxMessage = OutboxMessage(
aggregateType = AggregateType.TEMP_RESERVATION,
eventType = EventType.INSERT,
payload = TempReservationPayload(saved.id!!, saved.concert.id!!, saved.seatNumber).toMap(),
status = OutboxStatus.PENDING
)
outboxRepository.save(outboxMessage)
concertRankingPort.checkAndMarkSoldOut(concert.id!!)
return saved
}
위 코드에서는 직접 아웃박스 패턴을 사용해 redis에 임시 예약 정보를 저장하도록 하고 있다
또한 checkAndMarkSoldOut을 호출해 콘서트가 매진되었는지 확인하고 마킹하도록 한다 (부가 로직)
이 두개를 이벤트로 뺄 것이다
먼저 리스너를 작성해주자
@Component
class ReservationOutboxEventListener(
private val outboxRepository: OutboxRepository,
) {
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
fun reservationCreatedEvent(event: ReservationCreatedEvent) {
outboxRepository.save(OutboxMessage(
aggregateType = AggregateType.TEMP_RESERVATION,
eventType = EventType.INSERT,
payload = TempReservationPayload(event.reservationId, event.concertId, event.seatNumber).toMap(),
status = OutboxStatus.PENDING
))
}
}
redis에 임시 예약 정보를 저장하는 것은 반드시 정합성이 보장되어야 하는 로직이다@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)을 사용해서 이 이벤트를 발생시킨 곳에서 시작된 트랜잭션에 포함되도록 한다
따라서 reservationCreatedEvent가 실패하면 트랜잭션이 모두 롤백되어 정합성이 보장될 것이다
@Component
class ReservationAfterCommitEventListener(
private val concertRankingPort: ConcertRankingPort,
private val dataFlatformPort: DataFlatformPort
) {
private val log = KotlinLogging.logger { }
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
fun handleReservationCreate(event: ReservationCreatedEvent) {
dataFlatformPort.transferData(CreateReservationInfo(
event.reservationId,
event.concertId,
event.memberId
))
concertRankingPort.checkAndMarkSoldOut(event.concertId)
}
}
그리고 과제 목표였던 데이터 플랫폼에 데이터 전송하기와 아까 봤던 부가 로직인 매진 랭킹 마킹하기를 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)에 두자.
AFTER_COMMIT을 사용하면 트랜잭션이 커밋된 뒤에 리스너가 동작하기에 이 부가 로직이 실패하더라도 별개의 트랜잭션이기 때문에 롤백되지 않는다
여기서 @Async까지 쓰게 되면 별개의 스레드에서 동작하게 되므로 더 빠르게 처리가 가능하다
만약 위의 부가 로직이 시간이 오래걸리는 작업이라면 @Async를 쓰는게 더 좋을것이다
이렇게 리스너를 작성해두고 make함수에서 이벤트 퍼블리셔를 의존한 다음 이벤트를 발행하자
val outboxMessage = OutboxMessage(
aggregateType = AggregateType.TEMP_RESERVATION,
eentType = EventType.INSERT,
payload = TempReservationPayload(saved.id!!, saved.concert.id!!, saved.seatNumber).toMap(),
status = OutboxStatus.PENDING
)
outboxRepository.save(outboxMessage)
concertRankingPort.checkAndMarkSoldOut(concert.id!!)
...// 다음으로 변경
@Service
class ReservationService(
...
private val eventPublisher: DomainEventPublisher // 새로 의존
) {
eventPublisher.publish(ReservationCreatedEvent(
saved.id!!,
concert.id!!,
saved.seatNumber,
reserver.id!!
))
이렇게 부가 로직을 밖으로 빼놓았다
그런데 아웃박스 로직까지 굳이 뺄 필요가 있었을까... 싶기도 하다
뭐 make 함수 관점에서 보면 자신이 신경쓰지 않아도 되는 로직이긴 하니까
이제 다른 함수도 바꿔보자
@Transactional
fun payReservation(reservationId: Long, userId: String): Reservation {
...
reservation.status = ReservationStatus.RESERVE
reservation = reservationRepository.save(reservation)
tempReservationService.delete(reservationId) // 이 부분을
/**
eventPublisher.publish(ReservationPaidEvent(
reservation.id!!,
reservation.concert.id!!,
reserver.id!!
)) // 이렇게 바꾸자
**/
return reservation
}
위 함수는 임시 예약을 결제 후 확정하는 로직이다
여기서도 역시 임시 예약을 지우는 부가 로직이 있는데, 이를 이벤트로 바꾸자
아까 봤던 리스너 클래스에 함수를 추가하면 된다
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
fun handleReservationPaid(event: ReservationPaidEvent) {
tempReservationPort.delete(event.reservationId)
}
레디스에 있는 임시 예약 삭제하는게 왜 부가 로직이냐 !! 라고 할수도 있는데, 정합성이 필요한 부분은 아닌거 같다. 왜냐하면 애초에 확정된 예약이 있으면 거기서부터 예약이 막힐 것이기 때문에 레디스에 임시 예약이 살아있어도 크게 상관없다. TTL이 있기도 하고... 근데 이런 마인드가 나중에 큰 위험을 불러올수도 있을것 같다는... 이 막연한 불안감은 뭘까
다음으로 넘어가서~
@Transactional
override fun cleanupExpiredReservation(reservationId: Long) {
val reservation = reservationJpaRepository.findById(reservationId).orElseThrow {
throw EntityNotFoundException("${reservationId}를 가진 예약 정보를 조회할 수 없습니다.")
}
val concertId = reservation.concert.id!!
val reserveKey = getReservedKey(concertId)
redisOperations.removeFromReserveSet(reserveKey, reservation.seatNumber)
val seatListKey = "${TempReservationConstant.TEMP_RESERVATIONS}$reservationId"
redisOperations.deleteReservation(seatListKey)
reservation.status = ReservationStatus.CANCEL
reservationJpaRepository.save(reservation)
cacheManager.getCache("availableSeats")?.evict(concertId)
}
이 함수는 TTL이 만료되이서 삭제된 임시 예약을 처리하는 로직이다
그런데 여기서 캐시도 같이 삭제되고 있다. 이 캐시는 예약 가능한 좌석을 조회할 때 사용되는 캐시인데, 이 함수의 주요 로직과는 동떨어진 부가 로직이다.
이것도 이벤트로 빼보자
사실 별거없다 있는거 그대로 리스너로 빼고 기존의 부가 로직을 이벤트 퍼블리셔로 바꾸면 된다
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
fun handleReservationExpired(event: ReservationExpiredEvent) {
cacheManager.getCache("availableSeats")?.evict(event.concertId)
}
...
@Transactional
override fun cleanupExpiredReservation(reservationId: Long) {
...
eventPublisher.publish(ReservationExpiredEvent(
reservation.id!!,
reservation.concert.id
))
}
이렇게 만들어뒀는데... 나중에 MSA로 전환할때는 어떻게 해야할까?? @TrasactionalEventListener를 싹다 바꿔줘야 할 것 같은데...
아무튼 이렇게 사용하면 부가로직을 정말 깔끔하게 뺄 수 있어서 좋다
사실 이벤트를 발행하는 코드가 들어가있어서 살짝 거슬리기는 한데.. 이걸 AOP로 뺄수도 있나?
'항해 Lite' 카테고리의 다른 글
| Kafka로 대규모 확장 설계 (1) (0) | 2026.02.02 |
|---|---|
| 이벤트 기반 아키텍처 (1) (1) | 2026.01.30 |
| Redis를 활용한 캐싱 - 과제 리뷰 (0) | 2026.01.23 |
| 분산락과 캐싱으로 트래픽 대응(2) - 과제 리뷰 (0) | 2026.01.06 |
| 분산락과 캐싱으로 트래픽 대응 (1) (0) | 2026.01.01 |