리눅스/kafka
kafka streams DSL -GlobalKTable과 KStream을 join()
정지홍
2024. 11. 12. 09:05
public class JoinEx {
private static String APPLICATION = 'stream-test4';
private static String ADDRESS = "address";
private static String ORDER = "ORDER";
private static String RST ="join-RST";
public static void main(String[] args) {
Properties prop = new Properties();
prop.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG , "localhost:9092");
prop.setProperty(StreamsConfig.APPLICATION_ID_CONFIG , APPLICATION);
prop.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes.String().getClass());
prop.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
GlobalKTable< String , String > address = builder.globalTable( ADDRESS );
KStream< String , String > orders = builder.stream(ORDER);
orders.join( address , (orderKey , orderValue ) -> orderKey , (order , dest ) -> order + "주문하였으며 배송지는 " + dest + "입니다." ).to(RST);
KafkaStreams streams = new KafkaStreams( builder.build(), prop );
streams.start();
}
}
1. topic 생성

2. 프로그램 실행

3. console 실행



4.확인
