1. 우선 application.properties를 다음과 같이 설정
spring.application.name=springboot-kafka-test
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=testGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

2. payload 패키지를 만들어 주고 user라는 클래스 생성
package com.example.springboot_kafka_test.payload;
public class User {
private int id;
private String firstName;
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}

3. producer 클래스인 JsonKafkaProducer를 생성한다.
package com.example.springboot_kafka_test.kafka;
import com.example.springboot_kafka_test.payload.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class JsonKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(JsonKafkaProducer.class);
private KafkaTemplate<String, User> kafkaTemplate;
public JsonKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(User data) {
logger.info(String.format("Message sent -> %s",data.toString()));
Message<User> message = MessageBuilder.withPayload(data).setHeader(KafkaHeaders.TOPIC, "javaguides").build();
kafkaTemplate.send(message);
}
}

4. 스프링 어플리케이션 실행

5.postman으로 다음과 같이 전송해보자.

6.전송이 됨을 확인 할수있다.

7. 전송은 되지만 아마도 에러가 출력될거임
- consume 메서드의 시그니처가 String 타입을 기대하는데, 수신하는 데이터가 JSON 형식의 객체(User)이기 때문.
- JsonDeserializer를 사용하여 JSON 데이터를 역직렬화하려면, consume 메서드가 올바른 객체 타입을 받아들일 수 있도록 설정해야 함.

8. config의 KafkaTopic을 다음과 같이 수정

9. JsonKafkaProducer를 아래와 같이 수정하면 에러가 사라짐...

10. 컨슈머를 다음과 같이 작성
package com.example.springboot_kafka_test.kafka;
import com.example.springboot_kafka_test.payload.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class JsonKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(JsonKafkaConsumer.class);
@KafkaListener(topics="javaguides_json" , groupId = "testGroup")
public void consume(User user) {
logger.info(String.format("Json message recieved -> %s" , user.toString()));
}
}

11. postman으로 post하면 consume이 되는것을 확인 할 수 있음

'리눅스 > kafka' 카테고리의 다른 글
| kafka project 기본적인거 정리 (0) | 2024.11.27 |
|---|---|
| kafka spring boot ( streams에서 대문자로 바꿔서 다른 토픽으로 전송 ) (0) | 2024.11.26 |
| kafka spring producer 및 consumer 구현 ( String message) (0) | 2024.11.25 |
| 오늘 작업 기록( 피자 프로듀서에서 mysql로 적재) (2) | 2024.11.20 |
| kafka connect mysql - distributed모드 (2) | 2024.11.19 |