Kafka Connect - FileSource and FileSink Connector

Thursday, May 31, 2018

Kafka Connect - FileSource and FileSink Connector



Kafka Connect integrates Apache Kafka with other systems and makes it easy to add new systems to your scalable and secure stream data pipelines. In this article we will see how to use FileSource connector and FileSink connector to read text file content and to write the same to a file and to a Kafka topic.

This was tested on Hortonworks sandbox (HDP 2.6.4)

1. Create a Kafka topic, here the topic name is entechlog-KafkaConnectFile-0001
cd /usr/hdp/current/kafka-broker/bin

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic entechlog-KafkaConnectFile-0001

2. Create source, sink and worker configuration files, Normally all configuration files are maintained in "/usr/hdp/current/kafka-broker/config". In this example we will use the default configuration which comes in Hortonworks sandbox with some changes

Source Configuration - "connect-file-source.properties"
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/home/entechlog/kafka/input/test.txt
topic=entechlog-KafkaConnectFile-0001

Sink Configuration - "connect-file-sink.properties"
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/home/entechlog/kafka/output/test.sink.txt
topics=entechlog-KafkaConnectFile-0001

Worker Configuration - "connect-standalone.properties"
#bootstrap.servers
bootstrap.servers=IPaddressOfSandbox:6667,localhost:6667,sandbox.hortonworks.com:6667,sandbox-hdp.hortonworks.com:6667,sandbox-hdf.hortonworks.com:6667

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
#key.converter.schemas.enable=true
#value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.timeout.ms=1000
buffer.memory=100
#Uncomment below lines if you are using kerberos authentication
#producer.security.protocol=SASL_PLAINTEXT
#producer.sasl.kerberos.service.name=kafka
#consumer.security.protocol=SASL_PLAINTEXT
#consumer.sasl.kerberos.service.name=kafka

3. Create the source file and load the same with some data
echo -e "foo\nbar" > /home/entechlog/kafka/input/test.txt

4. Invoke the Kafka connect (standalone) and validate the data in topic and target file. For this open two different session and invoke below two comments.
./connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-file-source.properties /usr/hdp/current/kafka-broker/config/connect-file-sink.properties

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic entechlog-KafkaConnectFile-0001


No comments

Post a Comment

Error 404

The page you were looking for, could not be found. You may have typed the address incorrectly or you may have used an outdated link.

Go to Homepage