Write data from a local file to a Kafka topic
40 mins
Create topic my-connect-test
$ ~/apps/kafka/bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic my-connect-test
Since we are reading the contents of a local file and writing to Kafka, this file is considered our "sourcer. Therefore we will use the FileSource connector. We must create a configuration file to use with this connector. For this most part you can copy the example available in $KAFKA_HOME/config/connect-file-source.properties
. Below is an example of our my-file-source.properties file
# my-file-source.properties config file
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/my-test.txt
topic=my-connect-test
This file indicates that we will use the FileStreamSource connector class, read data from the /tmp.my-test.txt file, and publish records to the my-connect-test Kafka topic. We are also only using 1 task to push this data to Kafka, since we are reading/publishing a single file.
You can find a sample config file for standalone workers in $KAFKA_HOME/config/connect-standalone.properties
. We will call our file my-standalone.properties.
# my-standalone.properties worker config file
#bootstrap kafka servers
bootstrap.servers=localhost:9092
# specify input data format
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# The internal converter used for offsets, most will always want to use the built-in default
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# local file storing offsets and config data
offset.storage.file.filename=/tmp/connect.offsets
The main change in this example in comparison to the default is the key.converter and value.converter settings. Since our file contains simple text, we use the StringConverter types.
Create a file /tmp/my-test.txt and add in several lines of text. Here is an example:
line 1
line 2
line 3
If you have Kafkacat
$ kafkacat -C -b localhost:9092 -t my-connect-test -f 'key: %k, value: %s \n'
or
$ ~/apps/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic my-connect-test
$ ~/apps/kafka/bin/connect-standalone.sh ~/apps/kafka/config/my-standalone.properties ~/apps/kafka/config/my-file-source.properties
Our input file /tmp/my-test.txt will be read in a single process to the Kafka my-connect-test topic.
If we read from the Kafka topic that we created earlier, we should see the 3 lines in the source file that were written to Kafka: