리눅스/kafka
오늘 작업 기록( 피자 프로듀서에서 mysql로 적재)
정지홍
2024. 11. 20. 13:17
1. file-topic , file-topic-A 토픽 초기화 및 console-consumer 실행
2. 우선 fileProducer.java에서 txt파일을 읽는다.
3. streams로 a001가게에 대해서 필터링 하게 실행한다.
4. 우선 kafka_demo데이터베이스에 대해서 first이름으로 간단한 table을 만들었다.
- A001,ord6741, A001, 페퍼퍼로니 피자, 최 윤서, 07-00298-0721, 279 장유면, 2024-11-14 12:51:34
내가 보낼 데이터 형식이다.
CREATE TABLE first (
## keystore VARCHAR(10),
orderid VARCHAR(20),
store VARCHAR(10),
menu VARCHAR(50),
name VARCHAR(50),
cellphone VARCHAR(20),
address VARCHAR(100),
timestamp VARCHAR(30)
);
5. 커넥터를 수정한다. 그리고 커넥터를 등록...
curl -X POST -H "Content-Type: application/json" --data '{
"name": "mysql-sink-connector-pizza",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "file-topic-A",
"connection.url": "jdbc:mysql://localhost:3306/kafka_demo",
"connection.user": "root",
"connection.password": "root123",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "insert",
"table.name.format": "first",
"pk.mode": "none",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"delete.enabled": "false"
}
}' http://localhost:8083/connectors

6. stream에서 필터링 되는 부분을 수정한다...
package com.practice.kafka.filter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamFilter {
private static String applicationName = "stream-test5";
private static String source = "file-topic";
private static String sinkA = "file-topic-A";
private static String sinkB = "file-topic-B";
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG , "localhost:9092" );
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG , applicationName);
properties.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes.String().getClass());
properties.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes.String().getClass());
StreamsBuilder sb = new StreamsBuilder();
KStream< String , String > streamLog = sb.stream(source);
//KStream< String , String > filteredA = streamLog.filter( (key , value ) -> key.equals("A001"));
KStream<String, String> filteredA = streamLog.filter((key, value) -> key.equals("AAA")).mapValues(value -> {
// 데이터를 ',' 구분자로 나누기
String[] parts = value.split(",");
// 데이터가 예상하는 부분 개수를 갖는지 확인 (예: 8개의 필드)
if (parts.length != 7) {
// 예상하지 않는 형식이면 원본 데이터를 반환
System.out.println("parts의 길이는 " + parts.length);
return value;
}
// 필드 할당
String orderId = parts[0].trim();
String store = parts[1].trim();
String menu = parts[2].trim();
String name = parts[3].trim();
String cellphone = parts[4].trim();
String address = parts[5].trim();
String timestamp = parts[6].trim();
// JSON 문자열 생성
String jsonResult = String.format(
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"orderid\",\"type\":\"string\"}," +
"{\"field\":\"store\",\"type\":\"string\"},{\"field\":\"menu\",\"type\":\"string\"}," +
"{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"cellphone\",\"type\":\"string\"}," +
"{\"field\":\"address\",\"type\":\"string\"},{\"field\":\"timestamp\",\"type\":\"string\"}]," +
"\"optional\":false,\"name\":\"com.example.Order\"}," +
"\"payload\":{\"orderid\":\"%s\",\"store\":\"%s\",\"menu\":\"%s\"," +
"\"name\":\"%s\",\"cellphone\":\"%s\",\"address\":\"%s\",\"timestamp\":\"%s\"}}",
orderId, store, menu, name, cellphone, address, timestamp
);
System.out.println(jsonResult);
return jsonResult;
}).selectKey( (key,value) -> null);
KStream< String , String > filteredB = streamLog.filter( (key , value ) -> key.equals("B001"));
filteredA.to(sinkA);
filteredB.to(sinkB);
KafkaStreams streams = new KafkaStreams(sb.build(), properties);
streams.start();
}
}
7.FileProducer에서 보내고 streams에서 필터링해서 mysql에 저장함.


