__consumer_offsets
- commit으로 메시지를 가져오면 다음에 읽을 offset을 기록
- 즉, consumer그룹의 offset정보를 저장 및 관리하는 역할
- consumer rebalancing
- consumer 그룹에 변경(에러,제거, 새로운 consumer추가)가 이루어지며 리벨런스가 이루어지면, kafka는 __consumer_offsets에 저장된 offset정보를 이용해서, 어디에서 시작할지 알수있다.
- consumer가 어떤 partition에서 어디 offset까지 사용했는지 알수있다.
- offsets_retention.minutes
- topic에 저장된 offset정보의 유지기간을 지정
- default = 7days
- offsets.commit.required.acks
- offset커밋시 kafka broker에서 몇개의 복제본이 커밋을 확인해야하는지 설정
auto.offset.reset
- consumer가 topic에서 message를 가져올때, 처음부터 가져올까? 최근부터 가져올까? 정하는 파리미터
- auto.offset.reset = earliest 처음부터.....
- auto.offset.rest = latest 마지막부터....
- 한 consumer group에서 다른 컨슈머가 같이 참여하면 earliest로 설정해도 적용 안됨

1.우선 simple-topic에 a,b,c,d메시지를 순서대로 날린다.

2. 4개를 보냈으니 아래 로그를 보면 offset=5라고 출력된다. ( 5번째 기다린다.)
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group_01-1, groupId=group_01] Resetting offset for partition simple-topic-0 to position FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[jjh:9092 (id: 0 rack: null)], epoch=0}}.
3. 다음과 같이 확인 가능...

kafka-console-consumer --consumer.config ~/consumer_temp.config --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" | grep simple-topic
4. 메시지를 더 보내면 다음과 같이 변경됨.

'리눅스 > kafka' 카테고리의 다른 글
| group coordinator , static group membership (0) | 2024.10.20 |
|---|---|
| kafka에서 새로운 consumer가 들어오면 rebalance 되는 과정에 대해서 (7) | 2024.10.20 |
| kafka consumer fetcher (0) | 2024.10.18 |
| kafka partitioner 구현 ( custom partitioner ) (0) | 2024.10.18 |
| idempotence (0) | 2024.10.17 |