리눅스/kafka

오늘 작업 기록( 피자 프로듀서에서 mysql로 적재)

정지홍 2024. 11. 20. 13:17

1. file-topic , file-topic-A 토픽 초기화 및 console-consumer 실행

2. 우선 fileProducer.java에서 txt파일을 읽는다.

3. streams로 a001가게에 대해서 필터링 하게 실행한다.

4. 우선 kafka_demo데이터베이스에 대해서 first이름으로 간단한 table을 만들었다.

  • A001,ord6741, A001, 페퍼퍼로니 피자, 최 윤서, 07-00298-0721, 279 장유면, 2024-11-14 12:51:34
    내가 보낼 데이터 형식이다.
CREATE TABLE first (
  ##  keystore VARCHAR(10),
    orderid VARCHAR(20),
    store VARCHAR(10),
    menu VARCHAR(50),
    name VARCHAR(50),
    cellphone VARCHAR(20),
    address VARCHAR(100),
    timestamp VARCHAR(30)
);

 

5. 커넥터를 수정한다. 그리고 커넥터를 등록...

curl -X POST -H "Content-Type: application/json" --data '{
    "name": "mysql-sink-connector-pizza",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "file-topic-A",
        "connection.url": "jdbc:mysql://localhost:3306/kafka_demo",
        "connection.user": "root",
        "connection.password": "root123",
        "auto.create": "true",
        "auto.evolve": "true",
        "insert.mode": "insert",
        "table.name.format": "first",
        "pk.mode": "none",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "delete.enabled": "false"
    }
}' http://localhost:8083/connectors

 

 

6. stream에서 필터링 되는 부분을 수정한다...

package com.practice.kafka.filter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class StreamFilter {
    private static String applicationName = "stream-test5";
    private static String source = "file-topic";
    private static String sinkA = "file-topic-A";
    private static String sinkB = "file-topic-B";
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG , "localhost:9092" );
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG , applicationName);
        properties.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes.String().getClass());
        properties.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes.String().getClass());

        StreamsBuilder sb = new StreamsBuilder();
        KStream< String , String > streamLog = sb.stream(source);
        //KStream< String , String > filteredA = streamLog.filter( (key , value ) -> key.equals("A001"));
        KStream<String, String> filteredA = streamLog.filter((key, value) -> key.equals("AAA")).mapValues(value -> {
                    // 데이터를 ',' 구분자로 나누기
                    String[] parts = value.split(",");
                    // 데이터가 예상하는 부분 개수를 갖는지 확인 (예: 8개의 필드)
                    if (parts.length != 7) {
                        // 예상하지 않는 형식이면 원본 데이터를 반환
                        System.out.println("parts의 길이는 " + parts.length);
                        return value;
                    }
                    // 필드 할당
                    String orderId = parts[0].trim();
                    String store = parts[1].trim();
                    String menu = parts[2].trim();
                    String name = parts[3].trim();
                    String cellphone = parts[4].trim();
                    String address = parts[5].trim();
                    String timestamp = parts[6].trim();

                    // JSON 문자열 생성
                    String jsonResult = String.format(
                            "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"orderid\",\"type\":\"string\"}," +
                                    "{\"field\":\"store\",\"type\":\"string\"},{\"field\":\"menu\",\"type\":\"string\"}," +
                                    "{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"cellphone\",\"type\":\"string\"}," +
                                    "{\"field\":\"address\",\"type\":\"string\"},{\"field\":\"timestamp\",\"type\":\"string\"}]," +
                                    "\"optional\":false,\"name\":\"com.example.Order\"}," +
                                    "\"payload\":{\"orderid\":\"%s\",\"store\":\"%s\",\"menu\":\"%s\"," +
                                    "\"name\":\"%s\",\"cellphone\":\"%s\",\"address\":\"%s\",\"timestamp\":\"%s\"}}",
                            orderId, store, menu, name, cellphone, address, timestamp
                    );
                    System.out.println(jsonResult);
                    return jsonResult;
                }).selectKey( (key,value) -> null);
        KStream< String , String > filteredB = streamLog.filter( (key , value ) -> key.equals("B001"));
        filteredA.to(sinkA);
        filteredB.to(sinkB);
        KafkaStreams streams = new KafkaStreams(sb.build(), properties);
        streams.start();
    }
}

 

7.FileProducer에서 보내고 streams에서 필터링해서 mysql에 저장함.

insert에 성공하여, 나온 로그
sql에 insert됨을 확인함.
consumer로 확인해봄