send() -> Serializer -> partitioner -> 압축( 선택 ) -> record Accumulator 저장 -> sender에서 별도의 스레드로 전송
ProducerRecord
- String topic
- Integer patition
- Headers headers
- Key
- Value
- Long timestamp
- 이는 send호출시마다 입력하지만, 바로 전송하는 것이 아닌 내부 메모리에 저장시킨다. -> 그리고 topic partition에 따라서 record batch단위로 묶어서 전송. -> 각각은 내부 메모리에 여러 batch들로 buffer.memory 설정만큼 보관 가능. ( 여러개의 batch를 한번에 전송도 가능)
Record Accumulator
- partitioner에 의해서 토픽에 전송될 메시지 배치를 저장하는 메모리 영역.( partitioner는 append()사용 )
- sender thread는 record accumulator에 누적 저장된 배치 메시지를 꺼내서 broker에게 전송
- linger.ms를 이용하여 최대로 저장될 수 있는 시간을 지정.
- 이를 이용해서 보가 많이 쌓이게 기다리다가 가져감.
- sender thread가 전송 준비가 되어있다면 batch를 0~여러개 가져가는 것 가능
- linger.ms를 이용하여 최대로 저장될 수 있는 시간을 지정.
- 즉, kafkaProducer의 main 스레드는...
- 1. send메서드 호출
- 2. record accumulator에 저장
- 3. sender thread가 이 데이터를 broker에게 전송
- buffer.memory는 record accumulator의 전체 메모리 사이즈
- batch.size는 단일 배치의 사이즈
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG , "32000"); //한 번에 보낼 수 있는 메시지 배치의 최대 크기가 32KB로 설정
props.setProperty(ProducerConfig.LINGER_MS_CONFIG , "30"); // Producer가 메시지를 보낼 때 30밀리초 동안 새로운 메시지가 추가되기를 기다림
- kafka에서 send()는 기본적으로는 비동기async이며, send().get()형식으로 동기형태로 받을 수 있다.
- async가 콜백기반이면 RecordMedaData를 받아올수 있다.
'리눅스 > kafka' 카테고리의 다른 글
| kafka max.block.ms | delivery.timeout.ms | retry.backoff.ms | max.in.flight.requests.per.connection (0) | 2024.10.17 |
|---|---|
| kafka consumer 구현 - 01 (0) | 2024.10.16 |
| kafka에서 ack에 따른 전송 방식 ( acks 0 1 all ) (0) | 2024.10.16 |
| kafka producer 구현 - 06 ( pizza message 전송 ) (0) | 2024.10.15 |
| kafka producer 구현 - 05 ( callback 구현) (0) | 2024.10.15 |