리눅스/kafka

kafka 배치 단위 message 전송

정지홍 2024. 10. 16. 11:09

send() -> Serializer -> partitioner -> 압축( 선택 ) -> record Accumulator 저장 -> sender에서 별도의 스레드로 전송

 

 

ProducerRecord

  • String topic
  • Integer patition
  • Headers headers
  • Key
  • Value
  • Long timestamp
  • 이는 send호출시마다 입력하지만, 바로 전송하는 것이 아닌 내부 메모리에 저장시킨다. -> 그리고 topic partition에 따라서 record batch단위로 묶어서 전송. -> 각각은 내부 메모리에 여러 batch들로 buffer.memory 설정만큼 보관 가능. ( 여러개의 batch를 한번에 전송도 가능)

 

Record Accumulator

  • partitioner에 의해서 토픽에 전송될 메시지 배치를 저장하는 메모리 영역.( partitioner는 append()사용 )
  • sender thread는 record accumulator에 누적 저장된 배치 메시지를 꺼내서 broker에게 전송
    • linger.ms를 이용하여 최대로 저장될 수 있는 시간을 지정.
      • 이를 이용해서 보가 많이 쌓이게 기다리다가 가져감.
    • sender thread가 전송 준비가 되어있다면 batch를 0~여러개 가져가는 것 가능
  • 즉, kafkaProducer의 main 스레드는...
    • 1. send메서드 호출
    • 2. record accumulator에 저장
    • 3. sender thread가 이 데이터를 broker에게 전송
  • buffer.memory는 record accumulator의 전체 메모리 사이즈
  • batch.size는 단일 배치의 사이즈
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG , "32000"); //한 번에 보낼 수 있는 메시지 배치의 최대 크기가 32KB로 설정
props.setProperty(ProducerConfig.LINGER_MS_CONFIG , "30"); // Producer가 메시지를 보낼 때 30밀리초 동안 새로운 메시지가 추가되기를 기다림

 

 

- kafka에서 send()는 기본적으로는 비동기async이며, send().get()형식으로 동기형태로 받을 수 있다.

- async가 콜백기반이면 RecordMedaData를 받아올수 있다.