코딩 및 기타
ProjectFileAppend , ProjectFileAppendProducer
정지홍
2024. 12. 6. 08:49
package com.practice.kafka.producer;
import com.github.javafaker.Faker;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
public class ProjectFileAppend {
private static final List<String> ITEM_NAMES = List.of("apple", "melon", "peach", "water", "banana", "candy", "coffee");
private int orderSeq = 100;
public ProjectFileAppend() {}
private String getRandomValueFromList(List<String> list, Random random) {
return list.get(random.nextInt(list.size()));
}
public HashMap<String, String> produceMessage(Faker faker, Random random) {
String item = getRandomValueFromList(ITEM_NAMES, random);
String userIdTmp = faker.name().firstName();
String userId = faker.lorem().characters(2, 10, true);
String cart = random.nextInt(100) < 10 ? "true" : "false";
String message = String.format("%s, %s, %s", userId + userIdTmp, item, cart);
System.out.println(message);
HashMap<String, String> messageMap = new HashMap<>();
messageMap.put("key", item);
messageMap.put("message", message);
return messageMap;
}
public void writeMessages(String filePath, Faker faker, Random random) {
File file = new File(filePath);
try (PrintWriter printWriter = new PrintWriter(new BufferedWriter(new FileWriter(file, true)))) {
for (int i = 0; i < 100; i++) { // 100개의 메시지 생성
HashMap<String, String> message = produceMessage(faker, random);
printWriter.println(message.get("key") + "," + message.get("message"));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ProjectFileAppend projectFileAppend = new ProjectFileAppend();
Random random = new Random(); // 시드값 없이 랜덤 생성
Faker faker = Faker.instance(random);
String filePath = "/home/jeongjihong/IdeaProjects/Kafka_practice_01/practice/src/main/resources/log_data.txt";
while (true) { // 무한 루프
projectFileAppend.writeMessages(filePath, faker, random);
System.out.println("###### 100 messages written to file");
try {
Thread.sleep(10000); // 10초 대기
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Thread sleep interrupted");
break; // 에러 발생 시 무한 루프 종료
}
}
}
}
package com.practice.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
public class ProjectFileAppendProducer {
public static final Logger logger = LoggerFactory.getLogger(ProjectFileAppendProducer.class.getName());
public static void main(String[] args) {
// 파일 경로 설정
File file = new File("/home/jeongjihong/IdeaProjects/Kafka_practice_01/practice/src/main/resources/log_data.txt");
// API URL
String apiUrl = "http://localhost:8080/api/v1/kafka/publish";
// 파일을 지속적으로 읽으면서 메시지 전송
continuouslyReadFileAndSendMessages(file, apiUrl);
}
private static void continuouslyReadFileAndSendMessages(File file, String apiUrl) {
long lastModified = file.lastModified();
while (true) {
// 파일 수정 여부를 체크
if (file.lastModified() > lastModified) {
lastModified = file.lastModified();
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
// 파일의 마지막 부분부터 읽기
while ((line = reader.readLine()) != null) {
// 파일에서 key-value를 분리
String[] parts = line.split(",", 2); // key와 value를 ',' 기준으로 분리
if (parts.length == 2) {
String key = parts[0].trim();
String value = parts[1].trim();
// 메시지를 URL로 전송
sendHttpRequest(apiUrl, key, value);
} else {
logger.error("Invalid line format: {}", line);
}
}
} catch (IOException e) {
logger.error("Error reading file: {}", e.getMessage());
}
}
try {
// 1초 간격으로 파일을 체크
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("File monitoring thread interrupted: {}", e.getMessage());
break;
}
}
}
private static void sendHttpRequest(String apiUrl, String key, String value) {
try {
String encodedKey = URLEncoder.encode(key, StandardCharsets.UTF_8.toString());
String encodedValue = URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
URL url = new URL(apiUrl + "?key=" + encodedKey + "&value=" + encodedValue);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Content-Type", "application/json");
logger.info("보내는 URL 요청: {}", url);
int responseCode = connection.getResponseCode();
if (responseCode == 200) {
logger.info("msg sent successfully: key={}, value={}", key, value);
} else {
logger.error("Failed to send msg. Response code: {}", responseCode);
}
connection.disconnect();
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}