리눅스/kafka

kafka spring producer 및 consumer 구현 ( Json message)

정지홍 2024. 11. 25. 17:42

1. 우선 application.properties를 다음과 같이 설정

spring.application.name=springboot-kafka-test

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=testGroup
spring.kafka.consumer.auto-offset-reset=earliest


spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

 

 

2. payload 패키지를 만들어 주고 user라는 클래스 생성

package com.example.springboot_kafka_test.payload;

public class User {
    private int id;
    private String firstName;
    private String lastName;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

 

 

3. producer 클래스인 JsonKafkaProducer를 생성한다.

package com.example.springboot_kafka_test.kafka;

import com.example.springboot_kafka_test.payload.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class JsonKafkaProducer {
    private  static final Logger logger = LoggerFactory.getLogger(JsonKafkaProducer.class);

    private KafkaTemplate<String, User> kafkaTemplate;


    public JsonKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(User data) {
        logger.info(String.format("Message sent -> %s",data.toString()));
        Message<User> message = MessageBuilder.withPayload(data).setHeader(KafkaHeaders.TOPIC, "javaguides").build();
        kafkaTemplate.send(message);
    }
}

 

 

4. 스프링 어플리케이션 실행

 

 

5.postman으로 다음과 같이 전송해보자.

 

6.전송이 됨을 확인 할수있다.

 

 

7. 전송은 되지만 아마도 에러가 출력될거임

 

  • consume 메서드의 시그니처가 String 타입을 기대하는데, 수신하는 데이터가 JSON 형식의 객체(User)이기 때문.
  • JsonDeserializer를 사용하여 JSON 데이터를 역직렬화하려면, consume 메서드가 올바른 객체 타입을 받아들일 수 있도록 설정해야 함.

 

 

 

8. config의 KafkaTopic을 다음과 같이 수정

 

9. JsonKafkaProducer를 아래와 같이 수정하면 에러가 사라짐...

 

 

10. 컨슈머를 다음과 같이 작성

package com.example.springboot_kafka_test.kafka;
import com.example.springboot_kafka_test.payload.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class JsonKafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(JsonKafkaConsumer.class);

    @KafkaListener(topics="javaguides_json" , groupId = "testGroup")
    public void consume(User user) {
        logger.info(String.format("Json message recieved -> %s" , user.toString()));
    }
}

 

 

11. postman으로 post하면 consume이 되는것을 확인 할 수 있음