preloader
blog-post

Kafka Connect - FileSource and FileSink Connector

Kafka Connect integrates Apache Kafka with other systems and makes it easy to add new systems to your scalable and secure stream data pipelines. In this article we will see how to use FileSource connector and FileSink connector to read text file content and to write the same to a file and to a Kafka topic.

This was tested on Hortonworks sandbox (HDP 2.6.4)

  1. Create a Kafka topic, here the topic name is entechlog-KafkaConnectFile-0001
cd /usr/hdp/current/kafka-broker/bin  

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic entechlog-KafkaConnectFile-0001
  1. Create source, sink and worker configuration files, Normally all configuration files are maintained in /usr/hdp/current/kafka-broker/config. In this example we will use the default configuration which comes in Hortonworks sandbox with some changes

Source Configuration - “connect-file-source.properties”

name=local-file-source  
connector.class=FileStreamSource  
tasks.max=1  
file=/home/entechlog/kafka/input/test.txt  
topic=entechlog-KafkaConnectFile-0001  

Sink Configuration - “connect-file-sink.properties”

name=local-file-sink  
connector.class=FileStreamSink  
tasks.max=1  
file=/home/entechlog/kafka/output/test.sink.txt  
topics=entechlog-KafkaConnectFile-0001  

Worker Configuration - “connect-standalone.properties”

#bootstrap.servers  
bootstrap.servers=IPaddressOfSandbox:6667,localhost:6667,sandbox.hortonworks.com:6667,sandbox-hdp.hortonworks.com:6667,sandbox-hdf.hortonworks.com:6667  
  • The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

  • Need to configure these based on the format they want their data in when loaded from or stored into Kafka

    ``bash key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter

  • Converter-specific settings can be passed in by prefixing the Converter’s setting with the converter we want to apply it to

    #key.converter.schemas.enable=true  
    #value.converter.schemas.enable=true  
    key.converter=org.apache.kafka.connect.storage.StringConverter  
    value.converter=org.apache.kafka.connect.storage.StringConverter  
    
  • The internal converter used for offsets and config data is configurable and must be specified, but most users will always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format

    internal.key.converter=org.apache.kafka.connect.json.JsonConverter  
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter  
    internal.key.converter.schemas.enable=false  
    internal.value.converter.schemas.enable=false  
    offset.storage.file.filename=/tmp/connect.offsets 
    
  • Flush much faster than normal, which is useful for testing/debugging

    offset.flush.timeout.ms=1000  
    buffer.memory=100  
    
    #Uncomment below lines if you are using kerberos authentication  
    #producer.security.protocol=SASL\_PLAINTEXT  
    #producer.sasl.kerberos.service.name=kafka  
    #consumer.security.protocol=SASL\_PLAINTEXT  
    #consumer.sasl.kerberos.service.name=kafka  
    
  1. Create the source file and load the same with some data
echo -e "foo\nbar" > /home/entechlog/kafka/input/test.txt
  1. Invoke the Kafka connect (standalone) and validate the data in topic and target file. For this open two different session and invoke below two comments
./connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-file-source.properties /usr/hdp/current/kafka-broker/config/connect-file-sink.properties

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  entechlog-KafkaConnectFile-0001
Share this blog:
Comments

Related Articles

blog-post

Kafka commands

Overview Article to show the usage of some common Kafka commands. These commands are executed from Kafka’s command …