Apply map
transformation for KStreams
20 mins
File : src/main/java/x/lab07_streams/StreamsConsumer3_Map.java
This consumer will read a KafkaStream and map the incoming record into key value pair with action.
Example 1:
Incoming :
{"timestamp":1451635200005,"session":"session_251","domain":"facebook.com","cost":91,"user":"user_16","campaign":"campaign_5","ip":"ip_67","action":"clicked"}
Output:
("clicked", 1)
Example 2:
Input :
{"timestamp":1451635200010,"session":"session_224","domain":"foxnews.com","cost":17,"user":"user_89","campaign":"campaign_4","ip":"ip_57","action":"viewed"}
Output:
("viewed", 1)
=> Inspect the file and fix the TODO items.
Run the lab07_streams/StreamsConsumer3_Map
in Eclipse
Run the utils.ClickStreamProducer
in Eclipse
Expected output:
Notice KStream-Action
will only have (action,1)
[INFO ] 2018-07-22 14:21:27.487 [main] StreamsConsumer4_Map:main(81) - kstreams starting on clickstream
[KSTREAM-SOURCE-0000000000]: facebook.com, {"timestamp":1451635200005,"session":"session_251","domain":"facebook.com","cost":91,"user":"user_16","campaign":"campaign_5","ip":"ip_67","action":"clicked"}
[DEBUG] 2018-07-22 14:21:27.649 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(62) - map() : got : {"timestamp":1451635200005,"session":"session_251","domain":"facebook.com","cost":91,"user":"user_16","campaign":"campaign_5","ip":"ip_67","action":"clicked"}
[DEBUG] 2018-07-22 14:21:27.649 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(66) - map() : returning : KeyValue(clicked, 1)
[KSTREAM-MAP-0000000002]: clicked, 1
----
[KSTREAM-SOURCE-0000000000]: cnn.com, {"timestamp":1451635200020,"session":"session_66","domain":"cnn.com","cost":31,"user":"user_29","campaign":"campaign_3","ip":"ip_49","action":"blocked"}
[DEBUG] 2018-07-22 14:21:27.650 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(62) - map() : got : {"timestamp":1451635200020,"session":"session_66","domain":"cnn.com","cost":31,"user":"user_29","campaign":"campaign_3","ip":"ip_49","action":"blocked"}
[DEBUG] 2018-07-22 14:21:27.650 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(66) - map() : returning : KeyValue(blocked, 1)
[KSTREAM-MAP-0000000002]: blocked, 1
----
[KSTREAM-SOURCE-0000000000]: foxnews.com, {"timestamp":1451635200010,"session":"session_224","domain":"foxnews.com","cost":17,"user":"user_89","campaign":"campaign_4","ip":"ip_57","action":"viewed"}
[DEBUG] 2018-07-22 14:21:27.651 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(62) - map() : got : {"timestamp":1451635200010,"session":"session_224","domain":"foxnews.com","cost":17,"user":"user_89","campaign":"campaign_4","ip":"ip_57","action":"viewed"}
[DEBUG] 2018-07-22 14:21:27.651 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(66) - map() : returning : KeyValue(viewed, 1)
[KSTREAM-MAP-0000000002]: viewed, 1
----
[KSTREAM-SOURCE-0000000000]: google.com, {"timestamp":1451635200025,"session":"session_29","domain":"google.com","cost":16,"user":"user_1","campaign":"campaign_5","ip":"ip_74","action":"clicked"}
[DEBUG] 2018-07-22 14:21:27.651 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(62) - map() : got : {"timestamp":1451635200025,"session":"session_29","domain":"google.com","cost":16,"user":"user_1","campaign":"campaign_5","ip":"ip_74","action":"clicked"}
[DEBUG] 2018-07-22 14:21:27.651 [kafka-streams-consumer3-cc3c7211-fb5f-4b99-93c5-daf403beb1b5-StreamThread-1] StreamsConsumer4_Map:apply(66) - map() : returning : KeyValue(clicked, 1)
[KSTREAM-MAP-0000000002]: clicked, 1
----