Use Spark to process data in Kafka, we will do a flag clickstream records that are from fraudulent IPs
40 mins
If you don't have the topic, create it as follows
$ ~/apps/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic test --replication-factor 1 --partitions 10
Open a terminal and execute the producer
$ cd ~/kafka-labs/python
$ python producer-clickstream.py
This will populate clickstream topic
Inspect file: python/spark-consumer-fraud-detection-batch.py
We will work through TODO items in this file.
ACTION: Observe the following
Here we have a simple memory map to store fraudulent IPs for lookup.
# Fraud IP ranges
fraud_ips = (
# '3.3'
'4.4'
)
def is_fraud_ip(ip):
a_b = 'x.y'
tokens = ip.split('.')
if len(tokens) == 4:
a_b = '{}.{}'.format(tokens[0], tokens[1])
return a_b in fraud_ips
# define a udf
is_fraud_ip_udf = udf(is_fraud_ip, BooleanType())
Open a terminal and execute these commands:
$ cd ~/kafka-labs/python
$ ~/apps/spark/bin/spark-submit --master local[2] \
--driver-class-path . \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 \
spark-consumer-fraud-detection-batch.py
This will initialize Spark session and connect to Kafka. You will output like following
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
ACTION: Observe how Spark reads from Kafka and the data types
Here is code process entire Kafka data
# code for TODO-1
print ("total count: ", df.count())
df.show()
ACTION: Use the code above
ACTION: run the code again
ACTION: Observe output. It may seem like below:
+------------+--------------------+-----------+---------+------+--------------------+
| key| value| topic|partition|offset| timestamp|
+------------+--------------------+-----------+---------+------+--------------------+
|facebook.com|{"timestamp": 164...|clickstream| 9| 1988|2022-01-22 04:37:...|
|facebook.com|{"timestamp": 164...|clickstream| 9| 1989|2022-01-22 04:37:...|
|facebook.com|{"timestamp": 164...|clickstream| 9| 1990|2022-01-22 04:37:...|
+------------+--------------------+-----------+---------+------+--------------------+
Now let's extract JSON data into Spark dataframe, so we can process it easily.
# code for TODO-2
# first we need a schema
schema = StructType(
[
StructField('timestamp', StringType(), True),
StructField('ip', StringType(), True),
StructField('user', StringType(), True),
StructField('action', StringType(), True),
StructField('domain', StringType(), True),
StructField('campaign', StringType(), True),
StructField('cost', IntegerType(), True)
]
)
# extract value from JSON string and populate into another df
df2 = df.withColumn("value", from_json("value", schema))\
.select(col('key'), col('value.*'))
df2.printSchema()
df2.sample(0.1).show()
ACTION: Use the code above
ACTION: Run the code again
ACTION: Observe output. It may seem like below:
+------------+-------------+---------+-------+-------+------------+-----------+----+
| key| timestamp| ip| user| action| domain| campaign|cost|
+------------+-------------+---------+-------+-------+------------+-----------+----+
|facebook.com|1642826798006| 1.4.4.3|user-33|blocked|facebook.com|campaign-59| 93|
| gmail.com|1642826796603| 1.2.8.1|user-18| viewed| gmail.com|campaign-17| 66|
| gmail.com|1642826796803| 3.1.2.7|user-67| viewed| gmail.com|campaign-88| 78|
+------------+-------------+---------+-------+-------+------------+-----------+----+
Here is the code for TODO-3
fraud_df = df2.filter(is_fraud_ip_udf('ip'))
fraud_df.show()
print ("Total record count: ", df2.count())
print ("Fraud record count: ", fraud_df.count())
ACTION: Use the code above
ACTION: Run the code again
ACTION: Observe output. It may seem like below:
+-----------+-------------+--------+-------+-------+-----------+-----------+----+
| key| timestamp| ip| user| action| domain| campaign|cost|
+-----------+-------------+--------+-------+-------+-----------+-----------+----+
|twitter.com|1642712906740| 4.4.0.9| user-6|clicked|twitter.com|campaign-73| 54|
|twitter.com|1642714594204| 4.4.3.4|user-20|clicked|twitter.com|campaign-61| 39|
+-----------+-------------+--------+-------+-------+-----------+-----------+----+
Total record count: 14164
Fraud record count: 1677
Once the code is working, go ahead and change the fraud_ips
map. For example, add IP range 1.1
to IP map, as follows:
# Fraud IP ranges
fraud_ips = (
'3.3',
'4.4'
)
ACTION: Rerun the code and see if we can catch the new fraud IPs