Spring/기술 레시피

[Spring] 카프카 프로듀서 트랜잭션 동작 방식과 스프링 카프카 트랜잭션 실무 사례 (1)

Chipmunks 2025. 5. 7.
728x90

🙋‍♂️ 카프카 프로듀서에서 카프카 트랜잭션이 필요한 이유는 무엇인가요?

트랜잭션은 논리적으로 나눌 수 없는 작업 단위를 뜻해요.

트랜잭션에 있는 작업은 모두 성공하거나 실패해야 해요.

다르게 말하면 일부분만 완료해서는 안되는 작업 단위예요.

 

한 요구사항을 들어볼까요?

 

 


요구사항

당신은 주문 시스템을 만드는 메이커예요!

주문이 발생하면 다음 시스템에 이벤트를 전송 해야해요.

 

1. billing-events : 💵 결제 시스템

2. inventory-events : 📦 재고 시스템

3. notification-events : 🔔 알림 시스템

 

시스템 배치도

 

여러 토픽에 동시에 메시지를 전송해야 해요.

이 중 하나라도 전송에 실패하면 업무의 의미가 사라져요!

최종적 일관성을 달성하는 것보다 실패를 빠르게 알리는 게 중요해요.

소비한 메시지 실패 처리는 각 시스템팀에서 관리하기로 합의했어요.

 


 

카프카 트랜잭션 기능을 사용하지 않는다면, 위 요구사항을 충족할 수가 없어요!

카프카 트랜잭션이 없다면 무슨 일이 일어날까요?

 

세 시스템 중 재고 시스템만 이벤트 전송에 실패한다면 큰 일이 일어날 수 있어요.

결제는 되었지만 재고가 차감되지 않았어요!

방금 보낸 이벤트로 모든 재고가 없어졌어야 했어요.

 

큰 문제가 발생했어요.

여전히 상품 재고가 남아있다고 표시되는 거예요! 😱

미처 복구하기 전에 다른 주문 요청이 발생해버렸어요. 😱😱

주문 요청이 실패했어야 했지만, 결제 / 재고 / 알림 시스템이 이를 처리해 버리는 사태가 발생했어요.

( 🐿️ : 주문 시 먼저 재고를 잠가두거나 후처리로 환불/알림 취소 등 보상 트랜잭션으로 수습할 순 있어요. 다만 결제까지 되었기에 서비스의 신뢰도가 하락하고 유저 경험이 좋지 않을 거예요. 유저와의 신뢰도가 최우선이라고 가정해요! )

 

주문 / 결제 / 재고 시스템의 담당자와 고객 관리를 담당하는 CS 운영팀의 담당자끼리 머리를 맞대게 되었어요.

누구의 주문을 취소 시키고 이를 알릴 것인가 토론이 벌여지는 상황이 발생했죠.

또한 누락한 재고 데이터도 수정해야겠죠.

이는 곧 많은 사람의 업무에 병목을 일으키고 고객의 신뢰를 잃을 거예요.

 

당신이 해야할 건 한 가지밖에 없어요.

세 시스템이 구독 중인 토픽에 무사히 모든 메시지를 발행하게끔 보장해야 해요!

이럴때 필요한 게 '카프카 트랜잭션' 입니다!

 

🤔 카프카 브로커 내에서 카프카 트랜잭션은 어떻게 동작하나요?

카프카 브로커 내에서 카프카 트랜잭션은 아래 단계로 이루어져요!

 

1. 프로듀서 클라이언트와 트랜잭션 코디네이터와 통신

트랜잭션 코디네이터는 모든 카프카 브로커 내에서 실행되는 모듈이에요.

트랜잭션 로그는 카프카 내부 토픽인 '__transaction_state' 으로 클러스터 전체에서 관리해요.

이 토픽은 일반 토픽처럼 여러 파티션과 복제본으로 구성돼요.

트랜잭션 로그 파티션에 프로듀서 클라이언트의 트랜잭션 상태를 기록해요.

 

참고로 트랜잭션 로그의 전체 클러스터 파티션 수 기본값은 50개, 복제 수는 3개로 설정되어 있어요.

트랜잭션 로그의 특정 파티션의 리더가 트랜잭션 코디네이터가 되어요.

 

 

카프카 프로듀서 클라이언트가 initTransaction API를 호출해 transactional.id 값을 전달해요.

transational.id 값은 해싱되어 특정 파티션에 매핑되어요.

이 파티션의 리더 브로커가 코디네이터가 돼요.

여기선 1번 브로커의 파티션 3번이 매핑되었다고 가정할게요!

 

위 transactional.id 값을 지닌 프로듀서 클라이언트에게

1번 브로커가 트랜잭션 코디네이터가 되었어요.

앞으로 트랜잭션 상태 관리는 1번 브로커가 맡을 거예요!

코디네이터에게 transacitonal.id 를 전달했어요.

코디네이터는 이전에 등록한 transactional.id 를 찾아 종료(Abort)시켜요.

epoch 수를 증가시켜 이전 세션의 모든 트랜잭션 시도를 무효화 시켜요.

즉 현재 요청한 프로듀서만 코디네이터와 트랜잭션을 진행시킬 수 있어요.

 

같은 프로듀서끼리 중복으로 트랜잭션을 실행하는 걸 브로커 단에서 막고 있어요!

메시지 순서가 꼬이거나, 중복 전송 / 중복 커밋을 막기 위함이에요.

같은 transactional.id 를 가진 한 프로듀서가 메시지를 아직 보내고 있는데, 다른 프로듀서가 커밋을 해버리면 곤란하겠죠?

( 🐿️ : 이전 epoch 값을 가진 프로듀서가 메시지를 보내거나 커밋하면 자바 클라이언트 기준 ProducerFencedException 예외를 발생시켜요. )

 

이제 트랜잭션 로그 토픽에 새로운 트랜잭션 메타데이터가 Empty 상태로 메시지가 기록되어요.

트랜잭션 상태는 아래와 같아요. (.../kafka/coordinator/transaction/TransactionMetadata.scala)

object TransactionState {
  val AllStates = Set(
    Empty,
    Ongoing,
    PrepareCommit,
    PrepareAbort,
    CompleteCommit,
    CompleteAbort,
    Dead,
    PrepareEpochFence
  )
  ...
}

 

저장되는 데이터는 아래와 같아요.

__transaction_state
└─ Partition 3 (Leader: Broker 1)
   └─ offset 101: {
        transactionalId: "9b79c8b6-6e38-498d-84f0-1130d2aa9223",
        producerId: 123456789,
        producerEpoch: 1,
        state: "EMPTY", // 아직 beginTransaction() 호출 안 했음
        txnTimeout: 60000
      }

 

 

beginTransaction API 호출로 트랜잭션 시작 상태를 기록해요.

ONGOING 상태로 전환돼요.

 

 

마지막으로 프로듀서 클라이언트가 메시지를 전송해요.

프로듀서에서 '.send()' 호출로 메시지를 전송하는 동작은 트랜잭션의 유무와 상관 없이 똑같아요!

( 🐿️ : 브로커는 transacitonal.id 의 현재 epoch 값을 비교해 좀비 프로듀서(Fenced Producer)를 차단하는 로직이 추가돼요. )

 

단, 파티션에 기록할 때 트랜잭션 커밋 전까지는

아직 커밋이 되지 않은 'Uncommitted' 상태로 마킹돼요!

⚠️ 컨슈머가 READ_UNCOMMITTED (기본값) 격리 수준(Isolation Level)으로 설정되어 있다면, 소비합니다.
프로듀서 트랜잭션을 사용한다면 컨슈머 또한 READ_COMMITTED 격리 수준으로 맞추는 걸 권장해요!

 

모든 메시지를 전송하면 commitTransaction / abortTransaction 요청을 보내요.

코디네이터는 2PC(Two Phase Commit) 프로토콜을 준비해 최종 커밋 여부를 결정해요.

 

 

2. 코디네이터와 트랜잭션 로그 동기화

2PC 설명 전에, 하나만 짚고 넘어가 볼게요!

 

트랜잭션 코디네이터는 각 트랜잭션의 상태를 브로커의 메모리 내에 관리하고 있어요.

하지만 메모리만으로는 장애 복구가 불가능하죠.

앞서 살펴본 '__transaction_state' 라는 내부 토픽을 통해 상태를 로그 형태로 영속화하고 있어요.

 

  • 메모리에 저장된 트랜잭션 상태는 비동기로 트랜잭션 로그에 기록돼요.
  • 이 로그는 카프카의 일반 토픽처럼 복제되어 장애 복구를 보장하죠.

 

중요한 사실은 해당 트랜잭션 로그 파티션은 오직 리더 브로커만 읽고 쓰죠.

즉, 해당 'transactional.id' 가 매핑된 파티션의 리더 브로커가 코디네이터로서 트랜잭션 상태를 관리해요.

만약 해당 브로커가 다운되면?

  • 파티션의 새로운 리더 브로커가 선출되고,
  • 이 브로커는 복제된 트랜잭션 로그를 읽어, 자신의 메모리에 상태를 복원하고,
  • 다시 트랜잭션 코디네이터 역할을 수행하게 돼요.

노란색 : 트랜잭션 상태 / 파란색 : 메시지 로그 형태의 트랜잭션 상태

 

3. 코디네이터 2PC 프로토콜 실행

프로듀서가 커밋 또는 취소(Abort) 요청을 보내면, 2PC(2단계 커밋, Two-phase commit) 프로토콜을 실행해요.

 

첫 번째 단계는 트랜잭션 상태를 "PREPARE_COMMIT" 으로 변경해요!

메모리 상태에 따라 트랜잭션 로그에도 해당 상태 메시지를 기록해요.

트랜잭션 로그에 메시지를 발행하면, 커밋 준비를 마쳐요.

 

 

두 번째 단계는 재고 토픽 파티션에 기록한 메시지의 상태 마커를 'COMMITTED' 으로 변경해요.

'READ_COMMITTED' 격리 수준인 컨슈머에게 해당 메시지가 노출되어요.

각 시스템은 이제 메시지 소비를 진행해요.

 

이후 트랜잭션 로그에 COMPLETE 상태를 기록해 트랜잭션을 마쳐요.

프로듀서는 다음 트랜잭션을 시작할 수 있게 됩니다.

 

 

( 🐿️ : 지면상 파티션 하나에만 여러 메시지를 발행했는데요~ 요구사항과 같이 여러 토픽에 메시지를 발행하는 것과 동일합니다. )

 

🤔 스프링 카프카 프로듀서에서 동시에 카프카 트랜잭션을 사용하지 않게 조심해야겠는데요?

맞아요! 이외에도 프로듀서와 브로커에 설정한 트랜잭션 타임아웃을 넘기지 않도록 조심해야 해요.

다음 포스팅에서 스프링 프레임워크에서 카프카 프로듀서가 어떻게 동작하는지, 트랜잭션은 어떻게 동작하는지 알아보고

실무 사례를 알아보도록 해요!

댓글