코딩 및 기타

ProjectFileAppend , ProjectFileAppendProducer

정지홍 2024. 12. 6. 08:49
package com.practice.kafka.producer;

import com.github.javafaker.Faker;

import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Random;

public class ProjectFileAppend {
    private static final List<String> ITEM_NAMES = List.of("apple", "melon", "peach", "water", "banana", "candy", "coffee");
    private int orderSeq = 100;

    public ProjectFileAppend() {}

    private String getRandomValueFromList(List<String> list, Random random) {
        return list.get(random.nextInt(list.size()));
    }

    public HashMap<String, String> produceMessage(Faker faker, Random random) {
        String item = getRandomValueFromList(ITEM_NAMES, random);
        String userIdTmp = faker.name().firstName();
        String userId = faker.lorem().characters(2, 10, true);
        String cart = random.nextInt(100) < 10 ? "true" : "false";
        String message = String.format("%s, %s, %s", userId + userIdTmp, item, cart);
        System.out.println(message);

        HashMap<String, String> messageMap = new HashMap<>();
        messageMap.put("key", item);
        messageMap.put("message", message);

        return messageMap;
    }

    public void writeMessages(String filePath, Faker faker, Random random) {
        File file = new File(filePath);
        try (PrintWriter printWriter = new PrintWriter(new BufferedWriter(new FileWriter(file, true)))) {
            for (int i = 0; i < 100; i++) { // 100개의 메시지 생성
                HashMap<String, String> message = produceMessage(faker, random);
                printWriter.println(message.get("key") + "," + message.get("message"));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ProjectFileAppend projectFileAppend = new ProjectFileAppend();
        Random random = new Random(); // 시드값 없이 랜덤 생성
        Faker faker = Faker.instance(random);

        String filePath = "/home/jeongjihong/IdeaProjects/Kafka_practice_01/practice/src/main/resources/log_data.txt";

        while (true) { // 무한 루프
            projectFileAppend.writeMessages(filePath, faker, random);
            System.out.println("###### 100 messages written to file");

            try {
                Thread.sleep(10000); // 10초 대기
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Thread sleep interrupted");
                break; // 에러 발생 시 무한 루프 종료
            }
        }
    }
}
package com.practice.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;

public class ProjectFileAppendProducer {
    public static final Logger logger = LoggerFactory.getLogger(ProjectFileAppendProducer.class.getName());

    public static void main(String[] args) {
        // 파일 경로 설정
        File file = new File("/home/jeongjihong/IdeaProjects/Kafka_practice_01/practice/src/main/resources/log_data.txt");
        // API URL
        String apiUrl = "http://localhost:8080/api/v1/kafka/publish";

        // 파일을 지속적으로 읽으면서 메시지 전송
        continuouslyReadFileAndSendMessages(file, apiUrl);
    }

    private static void continuouslyReadFileAndSendMessages(File file, String apiUrl) {
        long lastModified = file.lastModified();

        while (true) {
            // 파일 수정 여부를 체크
            if (file.lastModified() > lastModified) {
                lastModified = file.lastModified();
                try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
                    String line;
                    // 파일의 마지막 부분부터 읽기
                    while ((line = reader.readLine()) != null) {
                        // 파일에서 key-value를 분리
                        String[] parts = line.split(",", 2); // key와 value를 ',' 기준으로 분리
                        if (parts.length == 2) {
                            String key = parts[0].trim();
                            String value = parts[1].trim();
                            // 메시지를 URL로 전송
                            sendHttpRequest(apiUrl, key, value);
                        } else {
                            logger.error("Invalid line format: {}", line);
                        }
                    }
                } catch (IOException e) {
                    logger.error("Error reading file: {}", e.getMessage());
                }
            }

            try {
                // 1초 간격으로 파일을 체크
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("File monitoring thread interrupted: {}", e.getMessage());
                break;
            }
        }
    }

    private static void sendHttpRequest(String apiUrl, String key, String value) {
        try {
            String encodedKey = URLEncoder.encode(key, StandardCharsets.UTF_8.toString());
            String encodedValue = URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
            URL url = new URL(apiUrl + "?key=" + encodedKey + "&value=" + encodedValue);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("GET");
            connection.setRequestProperty("Content-Type", "application/json");
            logger.info("보내는 URL 요청: {}", url);

            int responseCode = connection.getResponseCode();
            if (responseCode == 200) {
                logger.info("msg sent successfully: key={}, value={}", key, value);
            } else {
                logger.error("Failed to send msg. Response code: {}", responseCode);
            }
            connection.disconnect();
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }
}