리눅스/kafka

kafka connect 분산모드를 위한 설정파일

정지홍 2024. 11. 4. 16:59

분산모드 connect는 단일모드와 다르게 2개이상의 process가 1개의 그룹으로 묶여서 운영됨

=> 1개가 오류로 인하여 종료되어도, 지속적으로 서비스 가능

 

~/confluent/etc/kafka의 connect-distributed.properties파일

bootstrap.servers=localhost:9092
  • connect와 연동할 kafka 클러스터의 호스트 이름과 포트를 작성
group.id=connect-cluster
  • 다수의 connect 프로세스들을 묶을 이름을 지정
  • 동일한 group id로 지정된 커넥트들은 같은 그룹으로 인식함
    • 같은 그룹으로 지정된 커넥트들에서 커넥터가 실행되면 커넥트들에 분산되어 실행함
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true
  • data를 kafka에 저장할때 or 가져올때 변환하는데 사용한다
  • Json String ByteArray 컨버터들을 제공하며, 이를 사용하기 싫으면 enable옵션을 false로 바꾼다.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1
  • 분산 모드 커넥트는 kafka 내부 토픽에 offset정보를 저장함.
  • offset정보는 source connector or sink connector가 데이터 처리 시점을 저장하기 위해서 사용
  • 이는 데이터  처리시에 중요하니 복제 개수를 3보다 크게 하면 좋다.
offset.flush.interval.ms=10000
  •   task가 처리 완료한 offset을 commit하는 interval주기를 설정
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java
  • 플러그인 형태로 추가할 커넥터의 dir path를 입력
  • 해보니까 이게 제일 중요함..... 여기에 경로 잘못주면 아무것도 못읽어줌....

 


connect-distributed /home/jeongjihong/confluent/etc/kafka/connect-distributed.properties

위의 명령어를 입력하면 분산모드 커넥트를 실행함

 

잘 됨


curl -X GET http://localhost:8083/connector-plugins

위의 명령어들로 어떠한 plugin들이 있는지 확인가능

 


curl -X GET http://localhost:8083/connectors/local-file-source/status

커넥터가 잘 돌아가는 지 확인


curl -X DELETE http://localhost:8083/connectors/local-file-source

커넥터 종료


 curl -X GET http://localhost:8083/connectors

커넥터 목록들 조회