리눅스/kafka

kafka project v1 수정방향

정지홍 2024. 12. 4. 10:05

https://jihong.tistory.com/450

 

kafka project v1

보호되어 있는 글입니다. 내용을 보시려면 비밀번호를 입력하세요.

jihong.tistory.com

 

위의 프로젝트에서 개선할 점들 정리하자.....

 

 

토픽 이름에 관하여...

  • 1. 우선 main으로 보낼 topic이름을 item-log라고 정하자.    ===> 해결 
    • 그리고 key값을 가지는 상품들을 대략 10개 정도로 칭하자
  • 2. item-log-물건이름 형식으로 1차적으로 보내는 토픽 이름을 정하자. 이는 장바구니 여부와 상관없다.    ===> 해결 
    • ex) item-log-apple
  • 3.  item-log-물건이름-added 형식으로 장바구니에 담은 로그들만 보낸다.   ===> 해결 

    • ex) item-log-apple-added 형식

화면 관련

  • http://localhost:8080/api/v1/kafka/topics 접속하면 불필요 한것들이 출력된다. 이를 개선하자  ===> 해결  
    • db조회도 마찬가지...
  • http://localhost:8080/api/v1/kafka/topics/file-topic 접속하면 보기 불편함.  ===> 해결  
    • 보는 방식 개선 및 새로 고침을 하자.
  • 텍스트들 한국어로 바꾸자

기능관련

  • 커넥터 너무 치기 길어... 쉘로 짜보자
  • 토픽 재설정하는것도 짜자

 


 

 

 

1. 우선 KafkaTopic을 수정하였다.

  • why?
    • 이는 spring실행시, 해당 토픽이 존재하는지 보고, 없다면 생성한다.
    • bean메서드로 생성하려면, 개별적으로 적어줘야한다... (스프링부트2.6버전부터...)
package com.example.kafka_sw_project.config;

import com.example.kafka_sw_project.topology.FilterTest;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

import java.util.List;
import java.util.logging.Filter;
// configuration 어노테이션은 이 클래스가 하나 이상의 @bean 메서지를 정의하는 클래스임을 나타낸다.
// 즉, 이 클래스는 spring Ioc( inversion of control)컨테이너에서, bean으로 관리되며, 필요한 객체를 생성 및 주입하는데 사용
@Configuration
public class KafkaTopic {
    @Bean
    public NewTopic sourceTopic(){
        return TopicBuilder.name(FilterTest.sourceTopic).partitions(3).replicas(1).build();
    }


    @Bean
    public NewTopic itemLogApple() {
        return TopicBuilder.name("item-log-apple").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogMelon() {
        return TopicBuilder.name("item-log-melon").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogPeach() {
        return TopicBuilder.name("item-log-peach").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogWater() {
        return TopicBuilder.name("item-log-water").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogBanana() {
        return TopicBuilder.name("item-log-banana").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogCandy() {
        return TopicBuilder.name("item-log-candy").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogCoffee() {
        return TopicBuilder.name("item-log-coffee").partitions(3).replicas(1).build();
    }


    @Bean
    public NewTopic itemLogApple2() {
        return TopicBuilder.name("item-log-apple-added").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogMelon2() {
        return TopicBuilder.name("item-log-melon-added").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogPeach2() {
        return TopicBuilder.name("item-log-peach-added").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogWater2() {
        return TopicBuilder.name("item-log-water-added").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogBanana2() {
        return TopicBuilder.name("item-log-banana-added").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogCandy2() {
        return TopicBuilder.name("item-log-candy-added").partitions(3).replicas(1).build();
    }

    @Bean
    public NewTopic itemLogCoffee2() {
        return TopicBuilder.name("item-log-coffee-added").partitions(3).replicas(1).build();
    }

}
// bean은 spring container에 의해 관리될 객체(bean)을 정의할때 사용
// 위의 경우, pizzaTopic매서드는 새로운 카프카 토픽을 생성하고, 이를 spring container에서 bean으로 관리하게 함.

// TopicBuilder는 kafka에서 사용할 새로운 topic을 구성하는데 사용하는 빌더 클래스
// 이클래스의 name메서드는 토픽의 이름을 지정.
// build매서드는 설정이 완료된 NewTopic객체를 반환한다.

package com.example.kafka_sw_project.controller;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import cohttp://m.example.kafka_sw_project.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.http.HttpStatus;
import java.io.BufferedReader;
import java.io.InputStreamReader; // 추가
import java.io.IOException;      // 추가
import java.util.ArrayList;
import java.util.List;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;


// RestController는 이 클래스가 spring의 restful 웹서비스의 컨트롤러임을 의미한다. 이 클래스는 http요청을 처리하고, json or xml 형식의 데이터를 반환함.
// RequestMapping은 이 클래스 내의 모든 http요청의 경로가 괄호안으로 시작함을 명시.
@RestController
@RequestMapping("/api/v1/kafka")
public class MessageController {
    // KafkaProcuder는 message를 kafka로 전송하는 class이다. 여기서는 MessageController클래스에서 이 producer를 사용해서 message를 전송한다.
    private KafkaProducer kafkaProducer;
    public MessageController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }


    // Kafka 토픽 목록과 메시지 소비
    @GetMapping("/topics")
    public String listTopics() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Kafka project</h1>");

        // DB 조회 버튼 추가
        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>DB 조회</button></a>");
        htmlResponse.append("<hr>");  // 줄 추가

        // Kafka 토픽 목록을 출력하는 코드
        try {
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "kafka-topics",
                    "--bootstrap-server", "localhost:9092",
                    "--list"
            );

            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.contains("item") && !line.contains("rst")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/").append(line).append("\">").append(line).append("</a></p>");
                }
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing kafka-topics command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("</body></html>");
        return htmlResponse.toString();
    }


    @GetMapping("/topics/{topic}")
    public String printTopicMessages(@PathVariable String topic) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><head>");
        htmlResponse.append("<title>Kafka Topic Messages</title>");

        // JavaScript로 새로고침 제어 및 버튼 전환 코드 추가
        htmlResponse.append("<script>");
        htmlResponse.append("let refreshInterval = setInterval(() => location.reload(), 5000);"); // 5초마다 새로고침
        htmlResponse.append("function toggleReloading() {");
        htmlResponse.append("  const button = document.getElementById('reloadButton');");
        htmlResponse.append("  if (refreshInterval) {");
        htmlResponse.append("    clearInterval(refreshInterval);");
        htmlResponse.append("    refreshInterval = null;");
        htmlResponse.append("    button.textContent = '다시 불러오기';");
        htmlResponse.append("  } else {");
        htmlResponse.append("    refreshInterval = setInterval(() => location.reload(), 5000);");
        htmlResponse.append("    button.textContent = '불러오기 중지';");
        htmlResponse.append("  }");
        htmlResponse.append("}");
        htmlResponse.append("</script>");

        htmlResponse.append("</head><body>");
        htmlResponse.append("<h1>Topic으로 부터 message를 consuming하는중.....").append("<br>선택한 topic은 ").append(topic).append("입니다.</h1>");

        // 새로고침 제어 버튼 추가
        htmlResponse.append("<button id='reloadButton' onclick=\"toggleReloading()\">불러오기 중지</button>");

        // 테이블 시작
        htmlResponse.append("<table border='1'>");
        htmlResponse.append("<tr>")
                .append("<th>행 번호</th>")
                .append("<th>Key값</th>")
                .append("<th>회원 아이디</th>")
                .append("<th>품목</th>")
                .append("<th>장바구니 담기 여부</th>")
                .append("<th>담은 시각</th>")
                .append("</tr>");

        // 메시지를 저장할 리스트
        List<String[]> messages = new ArrayList<>();

        try {
            ProcessBuilder consumerBuilder = new ProcessBuilder(
                    "kafka-console-consumer",
                    "--bootstrap-server", "localhost:9092",
                    "--topic", topic,
                    "--from-beginning",
                    "--timeout-ms", "1000", // 2초 제한
                    "--property", "print.key=true", // Key 출력 설정
                    "--property", "key.separator= : " // Key-Value 구분자 설정
            );

            Process consumerProcess = consumerBuilder.start();
            BufferedReader consumerReader = new BufferedReader(new InputStreamReader(consumerProcess.getInputStream()));
            String consumerLine;

            while ((consumerLine = consumerReader.readLine()) != null) {
                // Key와 Value 구분
                String[] keyValue = consumerLine.split(" : ", 2);
                String key = keyValue.length > 1 ? keyValue[0] : "(null)";
                String value = keyValue.length > 1 ? keyValue[1] : keyValue[0];

                // Value를 콤마로 구분하여 파싱
                String[] valueParts = value.split(",\\s*");
                String firstValue = valueParts.length > 0 ? valueParts[0] : "(null)";
                String secondValue = valueParts.length > 1 ? valueParts[1] : "(null)";
                String thirdValue = valueParts.length > 2 ? valueParts[2] : "(null)";
                String fourthValue = valueParts.length > 3 ? valueParts[3] : "(null)";

                // 메시지 리스트에 추가
                messages.add(new String[] { key, firstValue, secondValue, thirdValue, fourthValue });
            }

            int consumerExitCode = consumerProcess.waitFor();
            if (consumerExitCode != 0) {
                htmlResponse.append("<p>Error consuming messages from the topic.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        // 메시지를 역순으로 테이블에 추가
        int rowNumber = messages.size();
        for (int i = messages.size() - 1; i >= 0; i--) {
            String[] message = messages.get(i);
            htmlResponse.append("<tr>")
                    .append("<td>").append(rowNumber).append("</td>")
                    .append("<td>").append(message[0]).append("</td>") // Key
                    .append("<td>").append(message[1]).append("</td>") // First Value
                    .append("<td>").append(message[2]).append("</td>") // Second Value
                    .append("<td>").append(message[3]).append("</td>") // Third Value
                    .append("<td>").append(message[4]).append("</td>") // Fourth Value
                    .append("</tr>");
            rowNumber--;
        }

        // 테이블 끝
        htmlResponse.append("</table>");
        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic list로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb")
    public String showDatabasesWithItemUsingJDBC() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>MySQL Databases 조회</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306";
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {

            while (resultSet.next()) {
                String databaseName = resultSet.getString(1);
                // "item" 단어가 포함된 경우만 출력
                if (databaseName.contains("item")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/").append(databaseName).append("\">").append(databaseName).append("</a></p>");
                }
            }

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic 리스트로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb/{database}")
    public String showTables(@PathVariable String database) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>선택한 Database에 존재하는 table들.....<br>선택힌 database는 ").append(database).append("</h1>");

        String username = "root";
        String password = "root123";

        try {
            // MySQL SHOW TABLES 명령어 실행
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "mysql",
                    "-u" + username,
                    "-p" + password,
                    "-e", "SHOW TABLES IN " + database
            );
            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            boolean isHeader = true; // 첫 번째 줄은 'Tables_in_database' 헤더이므로 건너뜀
            while ((line = reader.readLine()) != null) {
                if (isHeader) {
                    isHeader = false;
                    continue;
                }

                // 테이블 이름을 클릭하면 해당 테이블 데이터를 조회하는 URL로 이동
                htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/")
                        .append(database).append("/").append(line)
                        .append("\">").append(line).append("</a></p>");
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing MySQL command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>  DB 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb/{database}/{table}")
    public String showTableData(@PathVariable String database, @PathVariable String table) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Table의 data를 조회합니다....<br>").append(database).append(".").append(table).append("</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306/" + database; // Database를 경로에 포함
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement()) {
            // SQL query to fetch all data from the specified table
            String query = "SELECT * FROM " + table;
            ResultSet resultSet = statement.executeQuery(query);

            // Get column names
            int columnCount = resultSet.getMetaData().getColumnCount();
            htmlResponse.append("<table border='1'><tr>");
            for (int i = 1; i <= columnCount; i++) {
                htmlResponse.append("<th>").append(resultSet.getMetaData().getColumnName(i)).append("</th>");
            }
            htmlResponse.append("</tr>");

            // Get row data
            while (resultSet.next()) {
                htmlResponse.append("<tr>");
                for (int i = 1; i <= columnCount; i++) {
                    htmlResponse.append("<td>").append(resultSet.getString(i)).append("</td>");
                }
                htmlResponse.append("</tr>");
            }
            htmlResponse.append("</table>");

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb/").append(database).append("\"><button>  Table 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    // GetMappring은 http GET요청을 처리하는 핸들러 메서드를 정의함.
    // key와 value를 각각 요청 파라미터로 받아 처리
    @GetMapping("/publish")
    public ResponseEntity<String> publish(
            @RequestParam("key") String key,
            @RequestParam("value") String value) {
        // KafkaProducer의 sendMessage 메서드에 key와 value 전달
        kafkaProducer.sendMessage("item-log", key, value);
        return ResponseEntity.ok("from MessageController : Message를 topic으로 보냄.");
    }
}

package com.example.kafka_sw_project.controller;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import cohttp://m.example.kafka_sw_project.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.http.HttpStatus;
import java.io.BufferedReader;
import java.io.InputStreamReader; // 추가
import java.io.IOException;      // 추가
import java.util.ArrayList;
import java.util.List;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;


// RestController는 이 클래스가 spring의 restful 웹서비스의 컨트롤러임을 의미한다. 이 클래스는 http요청을 처리하고, json or xml 형식의 데이터를 반환함.
// RequestMapping은 이 클래스 내의 모든 http요청의 경로가 괄호안으로 시작함을 명시.
@RestController
@RequestMapping("/api/v1/kafka")
public class MessageController {
    // KafkaProcuder는 message를 kafka로 전송하는 class이다. 여기서는 MessageController클래스에서 이 producer를 사용해서 message를 전송한다.
    private KafkaProducer kafkaProducer;
    public MessageController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }


    // Kafka 토픽 목록과 메시지 소비
    @GetMapping("/topics")
    public String listTopics() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Kafka project</h1>");

        // DB 조회 버튼 추가
        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>DB 조회</button></a>");
        htmlResponse.append("<hr>");  // 줄 추가

        // Kafka 토픽 목록을 출력하는 코드
        try {
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "kafka-topics",
                    "--bootstrap-server", "localhost:9092",
                    "--list"
            );

            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.contains("item") && !line.contains("rst")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/").append(line).append("\">").append(line).append("</a></p>");
                }
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing kafka-topics command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("</body></html>");
        return htmlResponse.toString();
    }


    @GetMapping("/topics/{topic}")
    public String printTopicMessages(@PathVariable String topic) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><head>");
        htmlResponse.append("<title>Kafka Topic Messages</title>");

        // JavaScript로 새로고침 제어 및 버튼 전환 코드 추가
        htmlResponse.append("<script>");
        htmlResponse.append("let refreshInterval = setInterval(() => location.reload(), 5000);"); // 5초마다 새로고침
        htmlResponse.append("function toggleReloading() {");
        htmlResponse.append("  const button = document.getElementById('reloadButton');");
        htmlResponse.append("  if (refreshInterval) {");
        htmlResponse.append("    clearInterval(refreshInterval);");
        htmlResponse.append("    refreshInterval = null;");
        htmlResponse.append("    button.textContent = '다시 불러오기';");
        htmlResponse.append("  } else {");
        htmlResponse.append("    refreshInterval = setInterval(() => location.reload(), 5000);");
        htmlResponse.append("    button.textContent = '불러오기 중지';");
        htmlResponse.append("  }");
        htmlResponse.append("}");
        htmlResponse.append("</script>");

        htmlResponse.append("</head><body>");
        htmlResponse.append("<h1>Topic으로 부터 message를 consuming하는중.....").append("<br>선택한 topic은 ").append(topic).append("입니다.</h1>");

        // 새로고침 제어 버튼 추가
        htmlResponse.append("<button id='reloadButton' onclick=\"toggleReloading()\">불러오기 중지</button>");

        // 테이블 시작
        htmlResponse.append("<table border='1'>");
        htmlResponse.append("<tr>")
                .append("<th>행 번호</th>")
                .append("<th>Key값</th>")
                .append("<th>회원 아이디</th>")
                .append("<th>품목</th>")
                .append("<th>장바구니 담기 여부</th>")
                .append("<th>담은 시각</th>")
                .append("</tr>");

        // 메시지를 저장할 리스트
        List<String[]> messages = new ArrayList<>();

        try {
            ProcessBuilder consumerBuilder = new ProcessBuilder(
                    "kafka-console-consumer",
                    "--bootstrap-server", "localhost:9092",
                    "--topic", topic,
                    "--from-beginning",
                    "--timeout-ms", "1000", // 2초 제한
                    "--property", "print.key=true", // Key 출력 설정
                    "--property", "key.separator= : " // Key-Value 구분자 설정
            );

            Process consumerProcess = consumerBuilder.start();
            BufferedReader consumerReader = new BufferedReader(new InputStreamReader(consumerProcess.getInputStream()));
            String consumerLine;

            while ((consumerLine = consumerReader.readLine()) != null) {
                // Key와 Value 구분
                String[] keyValue = consumerLine.split(" : ", 2);
                String key = keyValue.length > 1 ? keyValue[0] : "(null)";
                String value = keyValue.length > 1 ? keyValue[1] : keyValue[0];

                // Value를 콤마로 구분하여 파싱
                String[] valueParts = value.split(",\\s*");
                String firstValue = valueParts.length > 0 ? valueParts[0] : "(null)";
                String secondValue = valueParts.length > 1 ? valueParts[1] : "(null)";
                String thirdValue = valueParts.length > 2 ? valueParts[2] : "(null)";
                String fourthValue = valueParts.length > 3 ? valueParts[3] : "(null)";

                // 메시지 리스트에 추가
                messages.add(new String[] { key, firstValue, secondValue, thirdValue, fourthValue });
            }

            int consumerExitCode = consumerProcess.waitFor();
            if (consumerExitCode != 0) {
                htmlResponse.append("<p>Error consuming messages from the topic.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        // 메시지를 역순으로 테이블에 추가
        int rowNumber = messages.size();
        for (int i = messages.size() - 1; i >= 0; i--) {
            String[] message = messages.get(i);
            htmlResponse.append("<tr>")
                    .append("<td>").append(rowNumber).append("</td>")
                    .append("<td>").append(message[0]).append("</td>") // Key
                    .append("<td>").append(message[1]).append("</td>") // First Value
                    .append("<td>").append(message[2]).append("</td>") // Second Value
                    .append("<td>").append(message[3]).append("</td>") // Third Value
                    .append("<td>").append(message[4]).append("</td>") // Fourth Value
                    .append("</tr>");
            rowNumber--;
        }

        // 테이블 끝
        htmlResponse.append("</table>");
        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic list로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb")
    public String showDatabasesWithItemUsingJDBC() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>MySQL Databases 조회</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306";
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {

            while (resultSet.next()) {
                String databaseName = resultSet.getString(1);
                // "item" 단어가 포함된 경우만 출력
                if (databaseName.contains("item")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/").append(databaseName).append("\">").append(databaseName).append("</a></p>");
                }
            }

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic 리스트로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb/{database}")
    public String showTables(@PathVariable String database) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>선택한 Database에 존재하는 table들.....<br>선택힌 database는 ").append(database).append("</h1>");

        String username = "root";
        String password = "root123";

        try {
            // MySQL SHOW TABLES 명령어 실행
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "mysql",
                    "-u" + username,
                    "-p" + password,
                    "-e", "SHOW TABLES IN " + database
            );
            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            boolean isHeader = true; // 첫 번째 줄은 'Tables_in_database' 헤더이므로 건너뜀
            while ((line = reader.readLine()) != null) {
                if (isHeader) {
                    isHeader = false;
                    continue;
                }

                // 테이블 이름을 클릭하면 해당 테이블 데이터를 조회하는 URL로 이동
                htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/")
                        .append(database).append("/").append(line)
                        .append("\">").append(line).append("</a></p>");
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing MySQL command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>  DB 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb/{database}/{table}")
    public String showTableData(@PathVariable String database, @PathVariable String table) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Table의 data를 조회합니다....<br>").append(database).append(".").append(table).append("</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306/" + database; // Database를 경로에 포함
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement()) {
            // SQL query to fetch all data from the specified table
            String query = "SELECT * FROM " + table;
            ResultSet resultSet = statement.executeQuery(query);

            // Get column names
            int columnCount = resultSet.getMetaData().getColumnCount();
            htmlResponse.append("<table border='1'><tr>");
            for (int i = 1; i <= columnCount; i++) {
                htmlResponse.append("<th>").append(resultSet.getMetaData().getColumnName(i)).append("</th>");
            }
            htmlResponse.append("</tr>");

            // Get row data
            while (resultSet.next()) {
                htmlResponse.append("<tr>");
                for (int i = 1; i <= columnCount; i++) {
                    htmlResponse.append("<td>").append(resultSet.getString(i)).append("</td>");
                }
                htmlResponse.append("</tr>");
            }
            htmlResponse.append("</table>");

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb/").append(database).append("\"><button>  Table 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    // GetMappring은 http GET요청을 처리하는 핸들러 메서드를 정의함.
    // key와 value를 각각 요청 파라미터로 받아 처리
    @GetMapping("/publish")
    public ResponseEntity<String> publish(
            @RequestParam("key") String key,
            @RequestParam("value") String value) {
        // KafkaProducer의 sendMessage 메서드에 key와 value 전달
        kafkaProducer.sendMessage("item-log", key, value);
        return ResponseEntity.ok("from MessageController : Message를 topic으로 보냄.");
    }
}

 

 

 

2. MessageController.java의 GetMapping의 publish부분을 수정한다.

  • 이 작업으로 fileProducer에서 메시지를 보내면, springboot가 해당 되는 경로 http://localhost:8080/api/v1/publish에서 받을거임.
    그리면 springboot는 item-log의 topic으로 메시지를 보낼거임.
@GetMapping("/publish")
public ResponseEntity<String> publish(
        @RequestParam("key") String key,
        @RequestParam("value") String value) {
    // KafkaProducer의 sendMessage 메서드에 key와 value 전달
    kafkaProducer.sendMessage("item-log", key, value);
    return ResponseEntity.ok("from MessageController : Message를 topic으로 보냄.");
}

package com.example.kafka_sw_project.controller;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import cohttp://m.example.kafka_sw_project.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.http.HttpStatus;
import java.io.BufferedReader;
import java.io.InputStreamReader; // 추가
import java.io.IOException;      // 추가
import java.util.ArrayList;
import java.util.List;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;


// RestController는 이 클래스가 spring의 restful 웹서비스의 컨트롤러임을 의미한다. 이 클래스는 http요청을 처리하고, json or xml 형식의 데이터를 반환함.
// RequestMapping은 이 클래스 내의 모든 http요청의 경로가 괄호안으로 시작함을 명시.
@RestController
@RequestMapping("/api/v1/kafka")
public class MessageController {
    // KafkaProcuder는 message를 kafka로 전송하는 class이다. 여기서는 MessageController클래스에서 이 producer를 사용해서 message를 전송한다.
    private KafkaProducer kafkaProducer;
    public MessageController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }


    // Kafka 토픽 목록과 메시지 소비
    @GetMapping("/topics")
    public String listTopics() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Kafka project</h1>");

        // DB 조회 버튼 추가
        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>DB 조회</button></a>");
        htmlResponse.append("<hr>");  // 줄 추가

        // Kafka 토픽 목록을 출력하는 코드
        try {
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "kafka-topics",
                    "--bootstrap-server", "localhost:9092",
                    "--list"
            );

            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.contains("item") && !line.contains("rst")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/").append(line).append("\">").append(line).append("</a></p>");
                }
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing kafka-topics command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("</body></html>");
        return htmlResponse.toString();
    }


    @GetMapping("/topics/{topic}")
    public String printTopicMessages(@PathVariable String topic) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><head>");
        htmlResponse.append("<title>Kafka Topic Messages</title>");

        // JavaScript로 새로고침 제어 및 버튼 전환 코드 추가
        htmlResponse.append("<script>");
        htmlResponse.append("let refreshInterval = setInterval(() => location.reload(), 5000);"); // 5초마다 새로고침
        htmlResponse.append("function toggleReloading() {");
        htmlResponse.append("  const button = document.getElementById('reloadButton');");
        htmlResponse.append("  if (refreshInterval) {");
        htmlResponse.append("    clearInterval(refreshInterval);");
        htmlResponse.append("    refreshInterval = null;");
        htmlResponse.append("    button.textContent = '다시 불러오기';");
        htmlResponse.append("  } else {");
        htmlResponse.append("    refreshInterval = setInterval(() => location.reload(), 5000);");
        htmlResponse.append("    button.textContent = '불러오기 중지';");
        htmlResponse.append("  }");
        htmlResponse.append("}");
        htmlResponse.append("</script>");

        htmlResponse.append("</head><body>");
        htmlResponse.append("<h1>Topic으로 부터 message를 consuming하는중.....").append("<br>선택한 topic은 ").append(topic).append("입니다.</h1>");

        // 새로고침 제어 버튼 추가
        htmlResponse.append("<button id='reloadButton' onclick=\"toggleReloading()\">불러오기 중지</button>");

        // 테이블 시작
        htmlResponse.append("<table border='1'>");
        htmlResponse.append("<tr>")
                .append("<th>행 번호</th>")
                .append("<th>Key값</th>")
                .append("<th>회원 아이디</th>")
                .append("<th>품목</th>")
                .append("<th>장바구니 담기 여부</th>")
                .append("<th>담은 시각</th>")
                .append("</tr>");

        // 메시지를 저장할 리스트
        List<String[]> messages = new ArrayList<>();

        try {
            ProcessBuilder consumerBuilder = new ProcessBuilder(
                    "kafka-console-consumer",
                    "--bootstrap-server", "localhost:9092",
                    "--topic", topic,
                    "--from-beginning",
                    "--timeout-ms", "1000", // 2초 제한
                    "--property", "print.key=true", // Key 출력 설정
                    "--property", "key.separator= : " // Key-Value 구분자 설정
            );

            Process consumerProcess = consumerBuilder.start();
            BufferedReader consumerReader = new BufferedReader(new InputStreamReader(consumerProcess.getInputStream()));
            String consumerLine;

            while ((consumerLine = consumerReader.readLine()) != null) {
                // Key와 Value 구분
                String[] keyValue = consumerLine.split(" : ", 2);
                String key = keyValue.length > 1 ? keyValue[0] : "(null)";
                String value = keyValue.length > 1 ? keyValue[1] : keyValue[0];

                // Value를 콤마로 구분하여 파싱
                String[] valueParts = value.split(",\\s*");
                String firstValue = valueParts.length > 0 ? valueParts[0] : "(null)";
                String secondValue = valueParts.length > 1 ? valueParts[1] : "(null)";
                String thirdValue = valueParts.length > 2 ? valueParts[2] : "(null)";
                String fourthValue = valueParts.length > 3 ? valueParts[3] : "(null)";

                // 메시지 리스트에 추가
                messages.add(new String[] { key, firstValue, secondValue, thirdValue, fourthValue });
            }

            int consumerExitCode = consumerProcess.waitFor();
            if (consumerExitCode != 0) {
                htmlResponse.append("<p>Error consuming messages from the topic.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        // 메시지를 역순으로 테이블에 추가
        int rowNumber = messages.size();
        for (int i = messages.size() - 1; i >= 0; i--) {
            String[] message = messages.get(i);
            htmlResponse.append("<tr>")
                    .append("<td>").append(rowNumber).append("</td>")
                    .append("<td>").append(message[0]).append("</td>") // Key
                    .append("<td>").append(message[1]).append("</td>") // First Value
                    .append("<td>").append(message[2]).append("</td>") // Second Value
                    .append("<td>").append(message[3]).append("</td>") // Third Value
                    .append("<td>").append(message[4]).append("</td>") // Fourth Value
                    .append("</tr>");
            rowNumber--;
        }

        // 테이블 끝
        htmlResponse.append("</table>");
        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic list로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb")
    public String showDatabasesWithItemUsingJDBC() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>MySQL Databases 조회</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306";
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {

            while (resultSet.next()) {
                String databaseName = resultSet.getString(1);
                // "item" 단어가 포함된 경우만 출력
                if (databaseName.contains("item")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/").append(databaseName).append("\">").append(databaseName).append("</a></p>");
                }
            }

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic 리스트로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb/{database}")
    public String showTables(@PathVariable String database) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>선택한 Database에 존재하는 table들.....<br>선택힌 database는 ").append(database).append("</h1>");

        String username = "root";
        String password = "root123";

        try {
            // MySQL SHOW TABLES 명령어 실행
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "mysql",
                    "-u" + username,
                    "-p" + password,
                    "-e", "SHOW TABLES IN " + database
            );
            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            boolean isHeader = true; // 첫 번째 줄은 'Tables_in_database' 헤더이므로 건너뜀
            while ((line = reader.readLine()) != null) {
                if (isHeader) {
                    isHeader = false;
                    continue;
                }

                // 테이블 이름을 클릭하면 해당 테이블 데이터를 조회하는 URL로 이동
                htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/")
                        .append(database).append("/").append(line)
                        .append("\">").append(line).append("</a></p>");
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing MySQL command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>  DB 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb/{database}/{table}")
    public String showTableData(@PathVariable String database, @PathVariable String table) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Table의 data를 조회합니다....<br>").append(database).append(".").append(table).append("</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306/" + database; // Database를 경로에 포함
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement()) {
            // SQL query to fetch all data from the specified table
            String query = "SELECT * FROM " + table;
            ResultSet resultSet = statement.executeQuery(query);

            // Get column names
            int columnCount = resultSet.getMetaData().getColumnCount();
            htmlResponse.append("<table border='1'><tr>");
            for (int i = 1; i <= columnCount; i++) {
                htmlResponse.append("<th>").append(resultSet.getMetaData().getColumnName(i)).append("</th>");
            }
            htmlResponse.append("</tr>");

            // Get row data
            while (resultSet.next()) {
                htmlResponse.append("<tr>");
                for (int i = 1; i <= columnCount; i++) {
                    htmlResponse.append("<td>").append(resultSet.getString(i)).append("</td>");
                }
                htmlResponse.append("</tr>");
            }
            htmlResponse.append("</table>");

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb/").append(database).append("\"><button>  Table 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    // GetMappring은 http GET요청을 처리하는 핸들러 메서드를 정의함.
    // key와 value를 각각 요청 파라미터로 받아 처리
    @GetMapping("/publish")
    public ResponseEntity<String> publish(
            @RequestParam("key") String key,
            @RequestParam("value") String value) {
        // KafkaProducer의 sendMessage 메서드에 key와 value 전달
        kafkaProducer.sendMessage("item-log", key, value);
        return ResponseEntity.ok("from MessageController : Message를 topic으로 보냄.");
    }
}

 

 

3. KafkaProducer를 수정하자.

package com.example.kafka_sw_project.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    private final String topic = "item-log";

    // 기존의 단일 메시지 전송 메서드
    public void sendMessage(String topic, String message) {
        logger.info(String.format("Message sent: %s", message));
        kafkaTemplate.send(topic, message);
    }

    // key와 value를 함께 전송하는 메서드 추가
    public void sendMessage(String topic, String key, String value) {
        logger.info(String.format("Message sent to topic: %s, key: %s, value: %s", topic, key, value));
        kafkaTemplate.send(topic, key, value);
    }
}

 

 

4. FilterTest.java의 item-log에서 받아오면, 아이템별로 필터링하는 부분을 수정하자.

  • 해당 코드는 apple에만 적용.
package com.example.kafka_sw_project.topology;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Component;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.ZoneId;

@Component
@Slf4j
public class FilterTest {
    public static String sourceTopic = "item-log";
    public static String sinkTopicApple = "item-log-apple-added-rst";

    // autowired는 streamBuilder 객체를 자동으로 주입해준다.
    // spring이 kafka streams 처리를 위해서 이 객체를 제공하며, stream 처리 로직을 설정 할 수 있게 해줌.
    @Autowired
    public void process(StreamsBuilder builder){
        var sourceStream = builder.stream( sourceTopic , Consumed.with(Serdes.String() , Serdes.String() ));
        sourceStream.print(Printed.<String , String>toSysOut().withLabel("this is source stream") );


        // ------------------------item-log-apple------------------
        var appleStream = sourceStream.filter( ( key , value ) -> "apple".equals( key ) ).mapValues( value -> {
                    // 한국 시간 가져오기
                    ZonedDateTime koreanTime = ZonedDateTime.now(ZoneId.of("Asia/Seoul"));
                    String formattedTime = koreanTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                    // 메시지에 시간 정보 추가
                    return value + ", " + formattedTime;
        });
        // "test-apple-buy"로 보내기: value에 "true"가 포함되면 "test-apple-buy"로도 보내기
        var testAppleBuyStream = appleStream.filter( ( key , value ) -> value.contains("true") ).map( (key, value) -> new KeyValue<>("item-log-apple-added", value));
        appleStream.to("item-log-apple");
        testAppleBuyStream.to("item-log-apple-added");

        var sinkStream = testAppleBuyStream.filter((key, value) -> key.equals("item-log-apple-added")).mapValues(value -> {
            // 데이터를 ',' 구분자로 나누기
            String[] parts = value.split(",");
            // 데이터가 예상하는 부분 개수를 갖는지 확인 (예: 8개의 필드)
            if (parts.length != 4) {
                // 예상하지 않는 형식이면 원본 데이터를 반환
                System.out.println("parts의 길이는 " + parts.length);
                return value;
            }
            // 필드 할당
            String userid = parts[0].trim();
            String buyitem = parts[1].trim();
            String addcart = parts[2].trim();
            String addtime = parts[3].trim();

            // JSON 문자열 생성
            String jsonResult = String.format(
                    "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"userid\",\"type\":\"string\"}," +
                            "{\"field\":\"buyitem\",\"type\":\"string\"},{\"field\":\"addcart\",\"type\":\"string\"}," +
                            "{\"field\":\"addtime\",\"type\":\"string\"}]," +
                            "\"optional\":false,\"name\":\"com.example.Order\"}," +
                            "\"payload\":{\"userid\":\"%s\",\"buyitem\":\"%s\",\"addcart\":\"%s\"," +
                            "\"addtime\":\"%s\"}}",
                    userid, buyitem, addcart, addtime
            );
            System.out.println(jsonResult);
            return jsonResult;
        }).selectKey( (key,value) -> null);
        sinkStream.to( sinkTopicApple  );



    }
}

 

 

5. 로그로 확인

main topic으로 흘러들어온것을 확인하였다.
장바구니 여부와 관계없이 apple의 key값을 가진것을 1차적으로 필터링 하였다.
2차적으로 장바구니에 담긴 것만 필터링을 수행하였다.
이는 connect에서 sql문을 날리기 위해서 관리하는 topic이다.

 

 

6. connect의 커넥터도 수정하자.

  • 1. 우선 items.apple을 생성하였다.
  • 2. connector를 등록하였다.
  • 3. 확인.....

우선 타겟 database는 items.apple이다.

CREATE TABLE apple (
    userid VARCHAR(50),
    buyitem VARCHAR(100),
    addcart VARCHAR(20),
    addtime VARCHAR(50)
);

 

curl -X POST -H "Content-Type: application/json" --data '{
    "name": "mysql-sink-connector-apple",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "item-log-apple-added-rst",
        "connection.url": "jdbc:mysql://localhost:3306/items",
        "connection.user": "root",
        "connection.password": "root123",
        "auto.create": "true",
        "auto.evolve": "true",
        "insert.mode": "insert",
        "table.name.format": "apple",
        "pk.mode": "none",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "delete.enabled": "false"
    }
}' http://localhost:8083/connectors

커넥터가 잘 등록이 되었다.
데이터베이스에 잘 삽입이 되었다.

 

 

7. 수정한 MessageController.java

  • 화면에 보여주는 부분들 수정
package com.example.kafka_sw_project.controller;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import com.example.kafka_sw_project.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.http.HttpStatus;
import java.io.BufferedReader;
import java.io.InputStreamReader; // 추가
import java.io.IOException;      // 추가
import java.util.ArrayList;
import java.util.List;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;


// RestController는 이 클래스가 spring의 restful 웹서비스의 컨트롤러임을 의미한다. 이 클래스는 http요청을 처리하고, json or xml 형식의 데이터를 반환함.
// RequestMapping은 이 클래스 내의 모든 http요청의 경로가 괄호안으로 시작함을 명시.
@RestController
@RequestMapping("/api/v1/kafka")
public class MessageController {
    // KafkaProcuder는 message를 kafka로 전송하는 class이다. 여기서는 MessageController클래스에서 이 producer를 사용해서 message를 전송한다.
    private KafkaProducer kafkaProducer;
    public MessageController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }


    // Kafka 토픽 목록과 메시지 소비
    @GetMapping("/topics")
    public String listTopics() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Kafka project</h1>");

        // DB 조회 버튼 추가
        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>DB 조회</button></a>");
        htmlResponse.append("<hr>");  // 줄 추가

        // Kafka 토픽 목록을 출력하는 코드
        try {
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "kafka-topics",
                    "--bootstrap-server", "localhost:9092",
                    "--list"
            );

            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.contains("item") && !line.contains("rst")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/").append(line).append("\">").append(line).append("</a></p>");
                }
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing kafka-topics command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("</body></html>");
        return htmlResponse.toString();
    }


    @GetMapping("/topics/{topic}")
    public String printTopicMessages(@PathVariable String topic) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><head>");
        htmlResponse.append("<title>Kafka Topic Messages</title>");

        // JavaScript로 새로고침 제어 및 버튼 전환 코드 추가
        htmlResponse.append("<script>");
        htmlResponse.append("let refreshInterval = setInterval(() => location.reload(), 5000);"); // 5초마다 새로고침
        htmlResponse.append("function toggleReloading() {");
        htmlResponse.append("  const button = document.getElementById('reloadButton');");
        htmlResponse.append("  if (refreshInterval) {");
        htmlResponse.append("    clearInterval(refreshInterval);");
        htmlResponse.append("    refreshInterval = null;");
        htmlResponse.append("    button.textContent = '다시 불러오기';");
        htmlResponse.append("  } else {");
        htmlResponse.append("    refreshInterval = setInterval(() => location.reload(), 5000);");
        htmlResponse.append("    button.textContent = '불러오기 중지';");
        htmlResponse.append("  }");
        htmlResponse.append("}");
        htmlResponse.append("</script>");

        htmlResponse.append("</head><body>");
        htmlResponse.append("<h1>Topic으로 부터 message를 consuming하는중.....").append("<br>선택한 topic은 ").append(topic).append("입니다.</h1>");

        // 새로고침 제어 버튼 추가
        htmlResponse.append("<button id='reloadButton' onclick=\"toggleReloading()\">불러오기 중지</button>");

        // 테이블 시작
        htmlResponse.append("<table border='1'>");
        htmlResponse.append("<tr>")
                .append("<th>행 번호</th>")
                .append("<th>Key값</th>")
                .append("<th>회원 아이디</th>")
                .append("<th>품목</th>")
                .append("<th>장바구니 담기 여부</th>")
                .append("<th>담은 시각</th>")
                .append("</tr>");

        // 메시지를 저장할 리스트
        List<String[]> messages = new ArrayList<>();

        try {
            ProcessBuilder consumerBuilder = new ProcessBuilder(
                    "kafka-console-consumer",
                    "--bootstrap-server", "localhost:9092",
                    "--topic", topic,
                    "--from-beginning",
                    "--timeout-ms", "1000", // 2초 제한
                    "--property", "print.key=true", // Key 출력 설정
                    "--property", "key.separator= : " // Key-Value 구분자 설정
            );

            Process consumerProcess = consumerBuilder.start();
            BufferedReader consumerReader = new BufferedReader(new InputStreamReader(consumerProcess.getInputStream()));
            String consumerLine;

            while ((consumerLine = consumerReader.readLine()) != null) {
                // Key와 Value 구분
                String[] keyValue = consumerLine.split(" : ", 2);
                String key = keyValue.length > 1 ? keyValue[0] : "(null)";
                String value = keyValue.length > 1 ? keyValue[1] : keyValue[0];

                // Value를 콤마로 구분하여 파싱
                String[] valueParts = value.split(",\\s*");
                String firstValue = valueParts.length > 0 ? valueParts[0] : "(null)";
                String secondValue = valueParts.length > 1 ? valueParts[1] : "(null)";
                String thirdValue = valueParts.length > 2 ? valueParts[2] : "(null)";
                String fourthValue = valueParts.length > 3 ? valueParts[3] : "(null)";

                // 메시지 리스트에 추가
                messages.add(new String[] { key, firstValue, secondValue, thirdValue, fourthValue });
            }

            int consumerExitCode = consumerProcess.waitFor();
            if (consumerExitCode != 0) {
                htmlResponse.append("<p>Error consuming messages from the topic.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        // 메시지를 역순으로 테이블에 추가
        int rowNumber = messages.size();
        for (int i = messages.size() - 1; i >= 0; i--) {
            String[] message = messages.get(i);
            htmlResponse.append("<tr>")
                    .append("<td>").append(rowNumber).append("</td>")
                    .append("<td>").append(message[0]).append("</td>") // Key
                    .append("<td>").append(message[1]).append("</td>") // First Value
                    .append("<td>").append(message[2]).append("</td>") // Second Value
                    .append("<td>").append(message[3]).append("</td>") // Third Value
                    .append("<td>").append(message[4]).append("</td>") // Fourth Value
                    .append("</tr>");
            rowNumber--;
        }

        // 테이블 끝
        htmlResponse.append("</table>");
        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic list로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb")
    public String showDatabasesWithItemUsingJDBC() {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>MySQL Databases 조회</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306";
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {

            while (resultSet.next()) {
                String databaseName = resultSet.getString(1);
                // "item" 단어가 포함된 경우만 출력
                if (databaseName.contains("item")) {
                    htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/").append(databaseName).append("\">").append(databaseName).append("</a></p>");
                }
            }

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics\"><button>Topic 리스트로 돌아가기</button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }


    @GetMapping("/topics/mydb/{database}")
    public String showTables(@PathVariable String database) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>선택한 Database에 존재하는 table들.....<br>선택힌 database는 ").append(database).append("</h1>");

        String username = "root";
        String password = "root123";

        try {
            // MySQL SHOW TABLES 명령어 실행
            ProcessBuilder processBuilder = new ProcessBuilder(
                    "mysql",
                    "-u" + username,
                    "-p" + password,
                    "-e", "SHOW TABLES IN " + database
            );
            Process process = processBuilder.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;

            boolean isHeader = true; // 첫 번째 줄은 'Tables_in_database' 헤더이므로 건너뜀
            while ((line = reader.readLine()) != null) {
                if (isHeader) {
                    isHeader = false;
                    continue;
                }

                // 테이블 이름을 클릭하면 해당 테이블 데이터를 조회하는 URL로 이동
                htmlResponse.append("<p><a href=\"/api/v1/kafka/topics/mydb/")
                        .append(database).append("/").append(line)
                        .append("\">").append(line).append("</a></p>");
            }

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                htmlResponse.append("<p>Error executing MySQL command.</p>");
            }

        } catch (IOException | InterruptedException e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb\"><button>  DB 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }
    @GetMapping("/topics/mydb/{database}/{table}")
    public String showTableData(@PathVariable String database, @PathVariable String table) {
        StringBuilder htmlResponse = new StringBuilder();
        htmlResponse.append("<html><body>");
        htmlResponse.append("<h1>Table의 data를 조회합니다....<br>").append(database).append(".").append(table).append("</h1>");

        String jdbcUrl = "jdbc:mysql://localhost:3306/" + database; // Database를 경로에 포함
        String username = "root";
        String password = "root123";

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             Statement statement = connection.createStatement()) {
            // SQL query to fetch all data from the specified table
            String query = "SELECT * FROM " + table;
            ResultSet resultSet = statement.executeQuery(query);

            // Get column names
            int columnCount = resultSet.getMetaData().getColumnCount();
            htmlResponse.append("<table border='1'><tr>");
            htmlResponse.append("<th>row number</th>"); // 행 번호 열 추가
            for (int i = 1; i <= columnCount; i++) {
                htmlResponse.append("<th>").append(resultSet.getMetaData().getColumnName(i)).append("</th>");
            }
            htmlResponse.append("</tr>");

            // Get row data
            int rowNum = 1; // 행 번호
            while (resultSet.next()) {
                htmlResponse.append("<tr>");
                htmlResponse.append("<td>").append(rowNum++).append("</td>"); // 행 번호 추가
                for (int i = 1; i <= columnCount; i++) {
                    htmlResponse.append("<td>").append(resultSet.getString(i)).append("</td>");
                }
                htmlResponse.append("</tr>");
            }
            htmlResponse.append("</table>");

        } catch (Exception e) {
            htmlResponse.append("<p>Error occurred: ").append(e.getMessage()).append("</p>");
        }

        htmlResponse.append("<a href=\"/api/v1/kafka/topics/mydb/").append(database).append("\"><button>  Table 리스트로 돌아가기  </button></a>");
        htmlResponse.append("</body></html>");

        return htmlResponse.toString();
    }



    // GetMappring은 http GET요청을 처리하는 핸들러 메서드를 정의함.
    // key와 value를 각각 요청 파라미터로 받아 처리
    @GetMapping("/publish")
    public ResponseEntity<String> publish(
            @RequestParam("key") String key,
            @RequestParam("value") String value) {
        // KafkaProducer의 sendMessage 메서드에 key와 value 전달
        kafkaProducer.sendMessage("item-log", key, value);
        return ResponseEntity.ok("from MessageController : Message를 topic으로 보냄.");
    }
}

 

 

 

 


메인화면
consuming화면
db 조회

 

 


#! /bin/bash
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log

kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-apple
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-banana
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-candy
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-coffee
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-melon
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-peach
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-water

kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-apple-added
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-banana-added
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-candy-added
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-coffee-added
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-melon-added
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-peach-added
kafka-topics --bootstrap-server localhost:9092 --delete --topic item-log-water-added