리눅스/kafka

kafka offset commit , 중복 읽기 , 읽기 누락

정지홍 2024. 10. 22. 13:02

offset commit이란?

  • consumer가 메시지를 어디까지 읽었는지, kafka에 저장하는 작업
  • 이는 메시지의 중복처리를 피하고, 재시작할때 consumer가 이전에 읽은 위치에서 다시 읽을 수 있게 해줌
  • offset은 각 파티션에서의 메시지 위치를 나타내는 숫자
  • commit에는 auto commit , manaul commit의 방식이 존재
  • auto commit 자동 커밋
    • kafka consumer가 주기적으로 자신이 읽은 마지막 offset을 자동으로 commit함
    • enable.auto.commit=true로 설정함.
    • 주기는 auto.commit.interval.ms=5000과 같이 설정. default=5000이다.
    • 장점: 사용이 쉬움
    • 단점: 메시지가 제대로 처리되기 전에 커밋이 될 수 있음. 이로인해 메시지 손실 or 중복처리 발생할 수 있음.
  • manaul commit 수동 커밋 
    • consumer가 메시지 처리 후 명시적으로 commit을 호출함.
    • 즉, 메시지가 성공적으로 처리되어야만 offset을 저장할 수 있음.
    • 주로 commitSync() or commitAsync() 사용
      • commitSync() : 동기적으로 커밋하여 성공 여부를 보장. 단, 시간 지연 문제
      • commitAsync() : 비동기적. 빠르지만 실패한 경우 재시도를 수동으로 처리
      • 정확한 commit 필요하면 동기적으로, 속도가 중요하면 비동기적으로 한다.

auto commit을 하겠습니다.

 

// auto commit을 비활성화. 즉, consumer가 message를 읽고 나서 kafka에 offset을 자동으로 commit하지 않으며, 소비자가 명시적으로 해줘야함.
props.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG , "false");

 


 

중복 읽기

  • consumer가 메시지를 읽고 처리하고, offset commit이 이루어지지 않거나, commit이 완료되기 전에 consumer가 실패하여 다시 시작되는 경우.
  • 예시...
    • 초기 상태에 partition에 메시지 A,B,C가 존재... consumer는 A메시지의 offset을 commit함.
    • consumer는 offset 1(B message)를 읽고 처리한 후, commitSync()호출하려 한다.
    • B메시지를 처리한 후, commit전에 error발생.
    • consumer가 이로 인해서 재시작하면 B message를 다시 읽는다.
    • 결과적으로는 B메시지를 중복으로 처리하게 됨
  • 중복 처리 방지 법
    • Idempotency
      • 처리하는 과정을 idempotency하게 해야한다.
      • 즉, 같은 메시지를 여러번 처리해도 결과가 동일해야 한다.
    • EOS ( exactly once semantics ) 정확히 한 번 처리
      • 트랜잭션을 사용해서 consumer-producer 사이에서 중복 처리를 방지한다.
    • msg를 batch로 처리하고 수동으로 commit 하기.

 


읽기 누락

  • consumer가 특정 message를 읽지 못하고 건너뛰는 상황
  • 예를 들면 메시지를 처리하는 중에 오류가 발생했으나 이미 commit이 된 경우를 의미.
  • 누락 방지 방법
    • 정확한 offset commit
    • commitSync() 사용
    • 트랜젝션을 사용한 메시지 처리
    • processing 실패시 재처리하는 로직