리눅스/kafka
kafka connect mysql - distributed모드
정지홍
2024. 11. 19. 11:52
1. 데이터베이스
- mysql -u root -p 를 사용하여 mysql에 진입한다.
- show databases; 입력하면 어떠한 databases가 존재하는지 볼수있다.
우리는 kafka_demo를 사용할거다. - use kafka_demo; 를 입력한다.
- DESCRIBE users; 를 입력하여 우리가 사용할 테이블에 대해서 조회해보자.
- select * from users; 로 내부 확인 가능


만약 Access denied for user 'root'@'localhost'가 난다면....
MySQL 5.7 이상에서는 기본적으로 root 사용자의 인증 방식이 auth_socket로 설정되어 있을 수 있음
이 방식은 root 사용자가 시스템 계정으로 로그인한 경우에만 MySQL에 접근할 수 있도록 제한한다.
이를 해결하려면, root 사용자의 인증 방식을 mysql_native_password로 변경해야 함.
// root 사용자 인증 방식 변경
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'your_password';
FLUSH PRIVILEGES;
// MySQL 종료 후 다시 로그인
exit
mysql -u root -p
// MySQL 사용자 권한 확인
GRANT ALL PRIVILEGES ON *.* TO 'root'@'localhost' WITH GRANT OPTION;
FLUSH PRIVILEGES;
==> 위의 명령어 뒤에 restart 해줌
2. 사용할 connect-distributed.properties에 대한 파일이다. 그리고 이를 실행한다...
- connect-distributed connect-distributed.properties 를 입력하여 실행
- https://jihong.tistory.com/396 에 들어가면 api목록들이 아래쪽에 작성하였다.
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
plugin.path=/usr/local/share/java,/home/jeongjihong/confluent/etc/kafka/plugins
3. 그리고 커넥터에 대한 설정을 다음과 같이 설정하였다.
- 그리고 입력하여 등록을 한다.
curl -X POST -H "Content-Type: application/json" --data '{
"name": "mysql-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"connection.url": "jdbc:mysql://localhost:3306/kafka_demo",
"connection.user": "root",
"connection.password": "root123",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "insert",
"table.name.format": "users",
"pk.mode": "none",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"delete.enabled": "false"
}
}' http://localhost:8083/connectors


4. producer로 데이터를 보내준다.
- {"schema":{"type":"struct","fields":[{"field":"id","type":"int64"},{"field":"name","type":"string"},{"field":"email","type":"string"}],"optional":false,"name":"cohttp://m.example.User"},"payload":{"id":8,"name":"Joe","email":"jowegcom"}}
- 위와 같은 형식으로 보냈다.
- 그리고 sql에서 확인 가능
{"schema":{"type":"struct","fields":[{"field":"id","type":"int64"},{"field":"name","type":"string"},{"field":"email","type":"string"}],"optional":false,"name":"com.example.User"},"payload":{"id":8,"name":"Joe","email":"jowegcom"}}


