In this workshop we will be ingesting data not directly into Kafka but rather through MQTT first. We will be using a fictitious Trucking company with a fleet of trucks constantly providing some data about the moving vehicles.
The following diagram shows the setup of the data flow we will be implementing.
We will not be using any real-life data, but have a program simulating some drivers and their trucks.
Our Data Platform does already provide all the services we need in this workshop, such as an MQTT broker and of course Apache Kafka.
The services related to mqtt are mosquitto-1
an easy to use MQTT broker, belonging to the Eclipse project and mqtt-ui
, a UI to browse into any MQTT broker.
image: eclipse-mosquitto:latest
hostname: mosquitto-1
container_name: mosquitto-1
- 1883:1883
- 9001:9001
- ./data-transfer:/data-transfer
- ./conf/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
restart: unless-stopped
image: vergissberlin/hivemq-mqtt-web-client:latest
container_name: mqtt-ui
hostname: mqtt-ui
labels: HiveMQ UI
com.mdps.service.webui.url: http://${PUBLIC_IP}:28136
- 28136:80
- ./data-transfer:/data-transfer
restart: unless-stopped
The configuration of mosquitto can be found in the ./conf/mosquittto/mosquitto.conf
file, which is volume mapped into the mosquitto-1
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
listener 1883
listener 9001
protocol websockets
In order to be able to see what we are producing into MQTT, we need something similar to the kafkacat
and kafka-console-consumer
In this workshop we show two options for consuming from MQTT
- use dockerized MQTT client in the terminal
- use browser-based HiveMQ Web UI
To start consuming using through a command line, perform the following docker command:
docker run -it --rm --network kafka-workshop efrecon/mqtt-client sub -h mosquitto-1 -p 1883 -t "truck/+/position" -v
The consumed messages will show up in the terminal.
To start consuming using the MQTT UI (HiveMQ Web UI), navigate to http://dataplatform:28136 and connect using dataplatform
for the Host field, 9101
for the Port field and then click on Connect:
When successfully connected, click on Add New Topic Subscription and enter truck/+/position
into Topic field and click Subscribe:
Alternatively you can also use the MQTT.fx or the MQTT Explorer applications to browse for the messages on the MQTT broker. They are both available for installation on Mac or Windows.
Now with the MQTT broker and the MQTT client in place, let's produce some messages to the MQTT topics.
For simulating truck data, we are going to use a Java program (adapted from Hortonworks) and maintained in this GitHub project. It can be started either using Maven or Docker. We will be using it as a Docker container.
The simulator can produce data either to a Kafka or MQTT. These two options are shown below.
Producing truck events to the MQTT broker on port 1883 is as simple as running the trivadis/iot-truck-simulator
docker image.
docker run --network kafka-workshop trivadis/iot-truck-simulator '-s' 'MQTT' '-h' 'mosquitto-1' '-p' '1883' '-f' 'CSV'
As soon as messages are produced to MQTT, you should see them either on the CLI or in the MQTT UI (Hive MQ) as shown below.
The Kafka cluster is configured with auto.topic.create.enable
set to false
. Therefore we first have to create all the necessary topics, using the kafka-topics
command line utility of Apache Kafka.
We can easily get access to the kafka-topics
CLI by navigating into one of the containers for the 3 Kafka Brokers. Let's use kafka-11
docker exec -ti kafka-1 bash
First let's see all existing topics
kafka-topics --bootstrap-server kafka-1:19092 --list
Now let's create the topic truck_position
in Kafka, where the message from MQTT should be integrated with.
kafka-topics --bootstrap-server kafka-1:19092 --create --topic truck_position --partitions 8 --replication-factor 2
Make sure to exit from the container after the topics have been created successfully.
If you don't like to work with the CLI, you can also create the Kafka topics using the CMAK GUI.
After successful creation, start a kafka-console-consumer
or kafkacat
to consume messages from the truck_position
Use either
kafka-console-consumer --bootstrap-server kafka-1:19092 --topic truck_position
kafkacat -b dataplatform -t truck_position
In order to get the messages from MQTT into Kafka, we will be using Kafka Connect. Luckily, there are multiple Kafka Connectors available for MQTT. We can either use the one provided by Confluent Inc. (in preview and available as evaluation license on Confluent Hub) or the one provided as part of the Landoop Stream-Reactor Project available on GitHub. We will be using the Landoop MQTT Connector.
Check-out the IoT Truck Demo Tutorial to see the Confluent MQTT Connector in Action.
There are two instances of the Kafka Connect service instance running as part of the Streaming Platform, connect-1
and connect-2
. To be able to add the connector implementations, without having to copy them into the docker container (or even create a dedicated docker image), both connect services are configured to retrieve additional implementations from the local folder /etc/kafka-connect/custom-plugins
. This folder is mapped as a volume to the kafka-connect
folder outside of the container.
In that kafka-connect
folder we need to copy the artefacts of the Kafka connectors we want to use.
Navigate into the kafka-connect
cd $DATAPLATFORM_HOME/plugins/kafka-connect/connectors
and download the kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gz
file from the Landoop Stream-Reactor Project project.
Once it is successfully downloaded, uncompress it using this tar
command and remove the file.
mkdir kafka-connect-mqtt-1.2.3-2.1.0-all && tar xvf kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gz -C kafka-connect-mqtt-1.2.3-2.1.0-all
rm kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gz
Now let's restart Kafka connect in order to pick up the new connector.
docker-compose restart kafka-connect-1 kafka-connect-2
The connector has now been added to the Kafka cluster. You can confirm that by watching the log file of the two containers:
docker-compose logs -f kafka-connect-1 kafka-connect-2
After some time you should see an output similar to the one below with a message that the MQTT connector has been added and later that the connector finished starting successfully ...
connect-2 | [2019-06-08 18:01:02,590] INFO Registered loader: PluginClassLoader{pluginLocation=file:/etc/kafka-connect/custom-plugins/kafka-connect-mqtt-1.2.1-2.1.0-all/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect-2 | [2019-06-08 18:01:02,591] INFO Added plugin 'com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect-2 | [2019-06-08 18:01:02,591] INFO Added plugin 'com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect-2 | [2019-06-08 18:01:02,592] INFO Added plugin 'com.datamountaineer.streamreactor.connect.converters.source.JsonResilientConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect-2 | [2019-06-08 18:01:02,592] INFO Added plugin 'com.landoop.connect.sql.Transformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect-2 | [2019-06-08 18:01:11,520] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
connect-2 | [2019-06-08 18:01:11,520] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
For creating an instance of the connector over the API, you can either use a REST client or the Linux curl
command line utility, which should be available on the Docker host. Curl is what we are going to use here.
Create a folder scripts and navigate into the folder.
mkdir -p scripts
cd scripts
In the scripts
folder, create a file
and add the code below.
echo "removing MQTT Source Connector"
curl -X "DELETE" "$DOCKER_HOST_IP:8083/connectors/mqtt-truck-position-source"
echo "creating MQTT Source Connector"
curl -X "POST" "$DOCKER_HOST_IP:8083/connectors" \
-H "Content-Type: application/json" \
-d '{
"name": "mqtt-truck-position-source",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.connection.timeout": "1000",
"tasks.max": "1",
"connect.mqtt.kcql": "INSERT INTO truck_position SELECT * FROM truck/+/position",
"connect.mqtt.connection.clean": "true",
"connect.mqtt.service.quality": "0",
"connect.mqtt.connection.keep.alive": "1000",
"": "tm-mqtt-connect-01",
"connect.mqtt.converter.throw.on.error": "true",
"connect.mqtt.hosts": "tcp://mosquitto-1:1883"
The script first removes the MQTT connector, if it already exists and then creates it (again).
Also create a separate script
for just stopping the connector and add the following code:
echo "removing MQTT Source Connector"
curl -X "DELETE" "$DOCKER_HOST_IP: 8083/connectors/mqtt-truck-position-source"
Make sure that the both scripts are executable
sudo chmod +x
sudo chmod +x
Finally let's start the connector by running the start-mqtt
A soon as the connector starts getting the messages from MQTT, they should start appearing on the console where the Kafka consumer is running.
Navigate to the Kafka Connect UI to see the connector running. kafka-1