리눅스/kafka
kafka connect 분산모드를 위한 설정파일
정지홍
2024. 11. 4. 16:59
분산모드 connect는 단일모드와 다르게 2개이상의 process가 1개의 그룹으로 묶여서 운영됨
=> 1개가 오류로 인하여 종료되어도, 지속적으로 서비스 가능
~/confluent/etc/kafka의 connect-distributed.properties파일

bootstrap.servers=localhost:9092
- connect와 연동할 kafka 클러스터의 호스트 이름과 포트를 작성
group.id=connect-cluster
- 다수의 connect 프로세스들을 묶을 이름을 지정
- 동일한 group id로 지정된 커넥트들은 같은 그룹으로 인식함
- 같은 그룹으로 지정된 커넥트들에서 커넥터가 실행되면 커넥트들에 분산되어 실행함
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
- data를 kafka에 저장할때 or 가져올때 변환하는데 사용한다
- Json String ByteArray 컨버터들을 제공하며, 이를 사용하기 싫으면 enable옵션을 false로 바꾼다.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
- 분산 모드 커넥트는 kafka 내부 토픽에 offset정보를 저장함.
- offset정보는 source connector or sink connector가 데이터 처리 시점을 저장하기 위해서 사용
- 이는 데이터 처리시에 중요하니 복제 개수를 3보다 크게 하면 좋다.
offset.flush.interval.ms=10000
- task가 처리 완료한 offset을 commit하는 interval주기를 설정
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java
- 플러그인 형태로 추가할 커넥터의 dir path를 입력
- 해보니까 이게 제일 중요함..... 여기에 경로 잘못주면 아무것도 못읽어줌....
connect-distributed /home/jeongjihong/confluent/etc/kafka/connect-distributed.properties
위의 명령어를 입력하면 분산모드 커넥트를 실행함

curl -X GET http://localhost:8083/connector-plugins
위의 명령어들로 어떠한 plugin들이 있는지 확인가능
curl -X GET http://localhost:8083/connectors/local-file-source/status
커넥터가 잘 돌아가는 지 확인
curl -X DELETE http://localhost:8083/connectors/local-file-source
커넥터 종료
curl -X GET http://localhost:8083/connectors
커넥터 목록들 조회