리눅스/kafka

kafka connect란?

정지홍 2024. 11. 3. 14:23

kafka connect란

  • 데이터의 외부 시스템과 kafka 클러스터 간의 통합을 자동화하는 프레임워크
  • source connector , sink connector 두가지 유형의 커넥터로 데이터를 처리
  • 다양한 데이터 소스와 유연하게 연동해서 데이터 통합의 허브 역할 수행 가능
  • 즉, 데이터 파이프라인 생성 시 반복 작업을 줄여서, 효율적인 전송이 이루어지게 하기 위함
    • 많은 매번 producer,consumer를 반복적으로 만드는 것은 비효율적. 그래서 이를 특정한 작업을 하는 템플릿과 같은 형태인 connector를 실행해여 비효율적인 작업을 줗인다.
    • 그래서 streaming ETL , data pipeline 구축 , data 동기화 등에 유용
      • ex) 파이프라인을 생성한다면, 사용하는 파라미터(topic name , file name)등을 커넥터로 입력받게 만들면 됨. 그래서 각각의 토픽은 이러한 고유한 설정값을 받아서 처리함.
  • [source application] ==> [source connector] ==> [kafka topic] ==> [sink connector] ==> [sink application]
  • 위에서 connector들은 kafka connect 영역이다.
    • 사용자가 conector생성 명령을 내리면, coonect는 내부에 connector와 task를 생성.
    • task는 connector에 종속되는 개념이며, 실질적인 데이터 처리를 한다. 그래서 데이터 처리가 정상적인지 확인하기 위하여 각각의 task를 확인해야함.

connecct 내부
전체적 실행과정

  • 주요 특징
    • source connector: ( 프로듀서 역할 )
      • 외부에서 data를 읽어서 kafka의 특정한 topic으로 전송.
      • ex) mysql에서 실시간으로 data 변경 사항을 가져와서 kafka에 저장
    • sink connector: ( 컨슈머 역할 )
      • kafka topic의 data를 외부 시스템으로 전송
      • ex) kafka에 들어온 log data를 elestic search에 저장 or 특정 분석용 db로 보내서 실시간으로 data 처리
  • 장점
    • 확장성, 간편한 설치 및 관리
    • 데이터 내결함성 및 복구 기능
    • 재사용 가능한 커넥터
  • 구성요소
    • 1. connector
      • kafka connector의 핵심이며, data를 가져오거나 전송하는 실제 작업을 수행
      • connector는 설정을 통해 data source 및 대상과의 통신을 관리
      • JDBC , elasticseartch,HDFS 커넥터 등 여러가지 존재
    • 2. task
      • 하나의 connector는 여러 직업을 수행 가능.
        • 각 작업은 특정 데이터의 일부를 처리
      • 작업 단위로 병렬 처리가 가능해서 성능을 높힘. connector구성에서 작업수를 조정해서 병렬로 처리 가능
      • ex) 10개의 partition을 가진 topic에서 5개의 작업으로 분할 처리 가능
    • 3. 변환 ( transformations , SMT - single message transformation )
      • data가 kafka로 전송되기 전에, record를 변환하는데 사용
      • filed의 추가/삭제, 포맷변경, 데이터 필터링 등의 작업 수행 가능
    • 4. converter
      • data가 kafka와 외부 시스템 사이에서 직렬화되거 역직렬화 되는 방식을 정의함
      • JsonConverter , AvroConverter가 많이 쓰임
    • 5. rest API
      • kafka connect는 rest api를 제공해서, connector의 생성,삭제,모니터링,관리 등을 쉽게 가능
  • 작동 모드
    • standalone mode 및 distributed mode 두가지의 방식이 존재
      • standalone mode
        • 단일 서버에서 connect작업을 실행하는 모드이며, 소규모 작업에 적합
        • 단일 프로세스 내에서 실행 되며, 간단한 설정으로 빠르게 사용 가능
        • 주로 개발환경 or 테스트환경에서 사용
        • 당연히 process가 하나이니, single point of failure 존재
      • distibuted mode
        • 다수의 작업을 여러 노드에 분산해서 실항하는 모드
        • 주로, 데이터 파이프라인에서 사용
        • fault tolerance와 내결함성을 제공하며, 작업 중단시 다른 노드로 작업 재배치
        • 작업 확장성 뛰어남
  • JDBC source Connector를 사용한다면...
    • mysql 같은 db에 연결하여 데이터를 kafka로 가져옴
    • 실시간 변경 사항을 읽어 CDC(change data capture)기능으로 db와 kafka를 동기화 시킴
  • 활용 사례
    • 실시간 데이터 통함
      • 다양한 data source에서 kafka로 data를 실시간 전송하여 streaming pipeline 구축 가능
    • CDC 데이터베이스 변경 데이터 갭쳐
      • 관계형 db와 kafka를 연동해서, db에서 발생하는 변경사항을 kafka로 실시간 전송
  • kafka connect의 REST API
    • GET
      • / 실행중인 커넥트 정보 확인
      • /connectors 실행중인 커넥터 이름 확인
        • curl -X GET http://localhost:8083/connectors
      • /connectors/커넥터_이름  실행중인 커넥터 정보 확인
        • curl -X GET http://localhost:8083/connectors/mysql-sink-connector
      • /connectors/커넥터_이름/config  실행중인 커넥터의 설정값 확인
      • /connectors/커넥터_이름/status  실행중인 커넥터 상태 확인
      • /connectors/커넥터_이름/tasks  실행중인 커넥터의 테스크 정보 확인
      • /connectors/커넥터_이름/테스크_아이디/status  실행중인 테스크의 상태 확인
      • /coonectors/커넥터_이름/topics  커넥터별 연동된 토픽정보 확인
      • /connector-plugins/ 커넥트에 존재하는 커넥터 플러그인 확인
    • POST
      • /connectors 새로운 커넥터 생성 요청
      • /connectors/커넥터_이름/restart 실행중인 커넥터 재시작
        • curl -X POST http://localhost:8083/connectors/mysql-sink-connector/restart
      • /connectors/커넥터_이름/테스크_아이디/restart  실행중인 테스크의 재시작 요청
    • PUT
      • /connectors/커넥터_이름/config 실행중인 커넥터의 설정값 변경 요청
      • /connectors/커넥터_이름/pause 커넥터 일시 중지
      • /connectors/커넥터_이름/resume 일시중지 된 커넥터 실행 요청
      • /connectors-plugins/커넥터_플러그인_이름/config/validate 커넥터 생성시 설정값 유효 여부 확인
    • DELETE
      • /connectors/커넥터_이름/   실행중인 커넥터 종료
        • curl -X DELETE http://localhost:8083/connectors/mysql-sink-connector