리눅스/kafka

kafka ETL pipeline에서의 고려사항 및 sqlite3연동

정지홍 2024. 11. 14. 11:38

ETL

  • extracing 추출
    • source system에서 data를 모으고, 이후 처리를 위해서 준비하는 과정.
    • 사전에 제작된 도구가 source system에서 data를 추출하는데 사용 가능
      • ex) db에서 추출 or log데이터를 추출하기 위한 아파치 플룸
    • 카프카에서 추출?
      • kafka connect의 source connector를 이용한다.
  • transforming 변환
    • 추출한 데이터를 참조해 처리하며, 데이터를 의미있게 가공하는 것
    • 데이터 사용시 두가지 접근 방법이 존재...
      • 1. 어떠한 저장소에서 데이터를 추출해오는 방법
      • 2. 직접 데이터를 추출과정에서 변환과정에 주는 것
    • 카프카에서 변환?
      • kafka streams를 이용한다.
  • loading 적재
    • data가 가공디면, 이후 사용을 위해서 특정한 시스템에 저장
    • 카프카에서 적재?
      • kafka connect의 export connector 사용

kafka connect sqlite3 예제

// -----------db설치 및 데이터 삽입----------------
// sqlite 설치
sudo apt-get install sqlite3
// console start
sqlite packt.db
// create table
create table authors(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , name VARCHAR(255));
// insert record
sqlite> insert into authors(name) values('jihong');
sqlite> insert into authors(name) values('jeongjihong');


// source-quickstartsqlite.properties라는 파일을 만들어 줌
// 이는 연결 설정 파일이다.
name=jdbc-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# connection.url=jdbc:sqlite:packt.db
connection.url=jdbc:sqlite:/home/jeongjihong/shell_scripts/packt.db
mode=incrementing
incrementing.column.name=id
topic.prefix=test-
table.whitelist=authors


// 다음 명령어로 실행 
connect-standalone /home/jeongjihong/confluent/etc/kafka/connect-standalone.properties source-quickstartsqlite.properties

위의 과정을 거쳐서 console-consumer로 데이터를 가져올수있다.

위에서의 과정은 sqlite를 설치하고, db에 데이터를 삽입하고, sink connector로 데이터를 보내고 콘솔로 읽어 들였다.