리눅스/kafka

kafka streams DSL -GlobalKTable과 KStream을 join()

정지홍 2024. 11. 12. 09:05
public class JoinEx {

    private static String APPLICATION = 'stream-test4';
    private static String ADDRESS = "address";
    private static String ORDER = "ORDER";
    private static String RST ="join-RST";

    public static void main(String[] args) {

        Properties prop = new Properties();
        prop.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG , "localhost:9092");
        prop.setProperty(StreamsConfig.APPLICATION_ID_CONFIG , APPLICATION);
        prop.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes.String().getClass());
        prop.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable< String , String > address = builder.globalTable( ADDRESS );
        KStream< String , String > orders = builder.stream(ORDER);
        orders.join( address , (orderKey , orderValue ) -> orderKey , (order , dest ) -> order + "주문하였으며 배송지는 " + dest + "입니다." ).to(RST);

        KafkaStreams streams = new KafkaStreams( builder.build(), prop );
        streams.start();
    }
}

 


 

1. topic 생성

 

2. 프로그램 실행

 

3. console 실행

 

4.확인