리눅스/kafka

kafka connect mysql - distributed모드

정지홍 2024. 11. 19. 11:52

1. 데이터베이스

  • mysql -u root -p 를 사용하여 mysql에 진입한다.
  • show databases; 입력하면 어떠한 databases가 존재하는지 볼수있다.
    우리는 kafka_demo를 사용할거다.
  • use kafka_demo; 를 입력한다.
  • DESCRIBE users; 를 입력하여 우리가 사용할 테이블에 대해서 조회해보자.
  • select * from users; 로 내부 확인 가능

만약 Access denied for user 'root'@'localhost'가 난다면....

MySQL 5.7 이상에서는 기본적으로 root 사용자의 인증 방식이 auth_socket로 설정되어 있을 수 있음
이 방식은 root 사용자가 시스템 계정으로 로그인한 경우에만 MySQL에 접근할 수 있도록 제한한다.
이를 해결하려면, root 사용자의 인증 방식을 mysql_native_password로 변경해야 함.

// root 사용자 인증 방식 변경
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'your_password';
FLUSH PRIVILEGES;


// MySQL 종료 후 다시 로그인
exit
mysql -u root -p

// MySQL 사용자 권한 확인
GRANT ALL PRIVILEGES ON *.* TO 'root'@'localhost' WITH GRANT OPTION;
FLUSH PRIVILEGES;

==> 위의 명령어 뒤에 restart 해줌

 

 

2. 사용할 connect-distributed.properties에 대한 파일이다. 그리고 이를 실행한다...

  • connect-distributed connect-distributed.properties 를 입력하여 실행
  • https://jihong.tistory.com/396 에 들어가면 api목록들이 아래쪽에 작성하였다.
bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter


key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1


config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

plugin.path=/usr/local/share/java,/home/jeongjihong/confluent/etc/kafka/plugins

 

 

3. 그리고 커넥터에 대한 설정을 다음과 같이 설정하였다.

  • 그리고 입력하여 등록을 한다.
curl -X POST -H "Content-Type: application/json" --data '{
    "name": "mysql-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "test-topic",
        "connection.url": "jdbc:mysql://localhost:3306/kafka_demo",
        "connection.user": "root",
        "connection.password": "root123",
        "auto.create": "true",
        "auto.evolve": "true",
        "insert.mode": "insert",
        "table.name.format": "users",
        "pk.mode": "none",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "delete.enabled": "false"
    }
}' http://localhost:8083/connectors

그냥 이렇게 입력하면 됨
확인 가능

 

4. producer로 데이터를 보내준다.

  • {"schema":{"type":"struct","fields":[{"field":"id","type":"int64"},{"field":"name","type":"string"},{"field":"email","type":"string"}],"optional":false,"name":"cohttp://m.example.User"},"payload":{"id":8,"name":"Joe","email":"jowegcom"}}
  • 위와 같은 형식으로 보냈다.
  • 그리고 sql에서 확인 가능
{"schema":{"type":"struct","fields":[{"field":"id","type":"int64"},{"field":"name","type":"string"},{"field":"email","type":"string"}],"optional":false,"name":"com.example.User"},"payload":{"id":8,"name":"Joe","email":"jowegcom"}}

producer
sql 조회
log 조회