Use Spark to process data in Kafka in real time as streams.
Lab 11.1 - Spark Kafla batch processing
40 mins
If you don't have the topic, create it as follows
$ ~/apps/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic test --replication-factor 1 --partitions 10
Inspect file: python/spark-consumer-streaming.py
We will work through TODO items in this file
ACTION: Inspect Kafka connection options
Open a terminal and execute these commands:
$ cd ~/kafka-labs/python
$ ~/apps/spark/bin/spark-submit --master local[2] \
--driver-class-path . \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 \
spark-consumer-streaming.py
This will initialize Spark session and connect to Kafka. You will output like following
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
ACTION: Observe how Spark reads from Kafka and the data types
ACTION: Keep this code running and this terminal open
$ pip install confluent_kafka
Open a terminal and execute the producer
$ cd ~/kafka-labs/python
$ python producer-clickstream.py
ACTION: Watch outupt as batches
The output may look like this:
-------------------------------------------
Batch: 2
-------------------------------------------
+------------+--------------------+-----------+---------+------+--------------------+
| key| value| topic|partition|offset| timestamp|
+------------+--------------------+-----------+---------+------+--------------------+
|facebook.com|{"timestamp": 164...|clickstream| 9| 1988|2022-01-22 04:37:...|
|facebook.com|{"timestamp": 164...|clickstream| 9| 1989|2022-01-22 04:37:...|
|facebook.com|{"timestamp": 164...|clickstream| 9| 1990|2022-01-22 04:37:...|
+------------+--------------------+-----------+---------+------+--------------------+
Now let's extract JSON data into Spark dataframe, so we can process it easily.
# code for TODO-1
# first we need a schema
schema = StructType(
[
StructField('timestamp', StringType(), True),
StructField('ip', StringType(), True),
StructField('user', StringType(), True),
StructField('action', StringType(), True),
StructField('domain', StringType(), True),
StructField('campaign', StringType(), True),
StructField('cost', IntegerType(), True)
]
)
# extract value from JSON string and populate into another df
df2 = df.withColumn("value", from_json("value", schema))\
.select(col('key'), col('value.*'))
df2.printSchema()
# Print out incoming data
query2 = df2.writeStream \
.outputMode("append") \
.format("console") \
.trigger(processingTime='3 seconds') \
.queryName("query 2") \
.start()
# run query
query2.awaitTermination()
ACTION: Comment out query1
ACTION: Use the code below
ACTION: Stop the streaming code, by hitting Ctrl+C, and run it again
ACTION: Also run the clickstream producer again to send data
------------------------------------------
Batch: 17
-------------------------------------------
+------------+-------------+---------+-------+-------+------------+-----------+----+
| key| timestamp| ip| user| action| domain| campaign|cost|
+------------+-------------+---------+-------+-------+------------+-----------+----+
|facebook.com|1642826798006| 1.4.4.3|user-33|blocked|facebook.com|campaign-59| 93|
| gmail.com|1642826796603| 1.2.8.1|user-18| viewed| gmail.com|campaign-17| 66|
| gmail.com|1642826796803| 3.1.2.7|user-67| viewed| gmail.com|campaign-88| 78|
+------------+-------------+---------+-------+-------+------------+-----------+----+
we are going to run a SQL query on Kafka data! How cool is that?
# code for TODO-2
## Let's do a SQL query on Kafka data!
df2.createOrReplaceTempView("clickstream_view")
# calculate avg spend per domain
sql_str = """
SELECT domain, count(*) as impressions, MIN(cost), AVG(cost), MAX(cost)
FROM clickstream_view
GROUP BY domain
"""
domain_cost = spark.sql(sql_str)
# TODO: try 'update' and 'complete' mode
query3 = domain_cost.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime='3 seconds') \
.queryName("domain cost") \
.start()
query3.awaitTermination()
ACTION: Comment out previous queries
ACTION: Use the code below
ACTION: Stop the streaming code, by hitting Ctrl+C, and run it again
ACTION: Also run the clickstream producer again to send data
-------------------------------------------
Batch: 7
-------------------------------------------
+------------+-----------+---------+------------------+---------+
| domain|impressions|min(cost)| avg(cost)|max(cost)|
+------------+-----------+---------+------------------+---------+
| youtube.com| 25| 1| 53.36| 97|
| gmail.com| 21| 5|48.476190476190474| 97|
|facebook.com| 15| 16| 52.86666666666667| 98|
| twitter.com| 16| 2| 31.8125| 79|
|linkedin.com| 17| 2| 44.11764705882353| 96|
+------------+-----------+---------+------------------+---------+