Kafka Connect - FileSource and FileSink Connector

  • Home
  • /
  • Kafka Connect - FileSource and FileSink Connector
Kafka Connect - FileSource and FileSink Connector

Kafka Connect - FileSource and FileSink Connector

Kafka 31 May 2018

Kafka Connect integrates Apache Kafka with other systems and makes it easy to add new systems to your scalable and secure stream data pipelines. In this article we will see how to use FileSource connector and FileSink connector to read text file content and to write the same to a file and to a Kafka topic.

This was tested on Hortonworks sandbox (HDP 2.6.4)

  1. Create a Kafka topic, here the topic name is entechlog-KafkaConnectFile-0001
cd /usr/hdp/current/kafka-broker/bin  

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic entechlog-KafkaConnectFile-0001
post thumb
  1. Create source, sink and worker configuration files, Normally all configuration files are maintained in /usr/hdp/current/kafka-broker/config. In this example we will use the default configuration which comes in Hortonworks sandbox with some changes
post thumb

Source Configuration - “connect-file-source.properties”

name=local-file-source  
connector.class=FileStreamSource  
tasks.max=1  
file=/home/entechlog/kafka/input/test.txt  
topic=entechlog-KafkaConnectFile-0001  

Sink Configuration - “connect-file-sink.properties”

name=local-file-sink  
connector.class=FileStreamSink  
tasks.max=1  
file=/home/entechlog/kafka/output/test.sink.txt  
topics=entechlog-KafkaConnectFile-0001  

Worker Configuration - “connect-standalone.properties”

#bootstrap.servers  
bootstrap.servers=IPaddressOfSandbox:6667,localhost:6667,sandbox.hortonworks.com:6667,sandbox-hdp.hortonworks.com:6667,sandbox-hdf.hortonworks.com:6667  
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will  
# need to configure these based on the format they want their data in when loaded from or stored into Kafka  

key.converter=org.apache.kafka.connect.json.JsonConverter  
value.converter=org.apache.kafka.connect.json.JsonConverter  

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply it to  

#key.converter.schemas.enable=true  
#value.converter.schemas.enable=true  
key.converter=org.apache.kafka.connect.storage.StringConverter  
value.converter=org.apache.kafka.connect.storage.StringConverter  

# The internal converter used for offsets and config data is configurable and must be specified, but most users will always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.  

internal.key.converter=org.apache.kafka.connect.json.JsonConverter  
internal.value.converter=org.apache.kafka.connect.json.JsonConverter  
internal.key.converter.schemas.enable=false  
internal.value.converter.schemas.enable=false  
offset.storage.file.filename=/tmp/connect.offsets  

# Flush much faster than normal, which is useful for testing/debugging  
offset.flush.timeout.ms=1000  
buffer.memory=100  

#Uncomment below lines if you are using kerberos authentication  
#producer.security.protocol=SASL\_PLAINTEXT  
#producer.sasl.kerberos.service.name=kafka  
#consumer.security.protocol=SASL\_PLAINTEXT  
#consumer.sasl.kerberos.service.name=kafka  
  1. Create the source file and load the same with some data
echo -e "foo\nbar" > /home/entechlog/kafka/input/test.txt
post thumb
  1. Invoke the Kafka connect (standalone) and validate the data in topic and target file. For this open two different session and invoke below two comments.
./connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-file-source.properties /usr/hdp/current/kafka-broker/config/connect-file-sink.properties

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  entechlog-KafkaConnectFile-0001
post thumb
Share: