리눅스/kafka
kafka spring producer 및 consumer 구현 ( Json message)
정지홍
2024. 11. 25. 17:42
1. 우선 application.properties를 다음과 같이 설정
spring.application.name=springboot-kafka-test
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=testGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

2. payload 패키지를 만들어 주고 user라는 클래스 생성
package com.example.springboot_kafka_test.payload;
public class User {
private int id;
private String firstName;
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}

3. producer 클래스인 JsonKafkaProducer를 생성한다.
package com.example.springboot_kafka_test.kafka;
import com.example.springboot_kafka_test.payload.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class JsonKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(JsonKafkaProducer.class);
private KafkaTemplate<String, User> kafkaTemplate;
public JsonKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(User data) {
logger.info(String.format("Message sent -> %s",data.toString()));
Message<User> message = MessageBuilder.withPayload(data).setHeader(KafkaHeaders.TOPIC, "javaguides").build();
kafkaTemplate.send(message);
}
}

4. 스프링 어플리케이션 실행

5.postman으로 다음과 같이 전송해보자.

6.전송이 됨을 확인 할수있다.

7. 전송은 되지만 아마도 에러가 출력될거임
- consume 메서드의 시그니처가 String 타입을 기대하는데, 수신하는 데이터가 JSON 형식의 객체(User)이기 때문.
- JsonDeserializer를 사용하여 JSON 데이터를 역직렬화하려면, consume 메서드가 올바른 객체 타입을 받아들일 수 있도록 설정해야 함.

8. config의 KafkaTopic을 다음과 같이 수정

9. JsonKafkaProducer를 아래와 같이 수정하면 에러가 사라짐...

10. 컨슈머를 다음과 같이 작성
package com.example.springboot_kafka_test.kafka;
import com.example.springboot_kafka_test.payload.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class JsonKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(JsonKafkaConsumer.class);
@KafkaListener(topics="javaguides_json" , groupId = "testGroup")
public void consume(User user) {
logger.info(String.format("Json message recieved -> %s" , user.toString()));
}
}

11. postman으로 post하면 consume이 되는것을 확인 할 수 있음
