Stream data from Kafka to Google Cloud Storage(GCS) using Aiven’s GCS Sink Connector

Table of Contents

In this article we will see how to use Aiven’s GCS Sink Connector for Apache Kafka to stream data from Kafka to Google Cloud Storage(GCS). Code used in this article can be found here. We will use Confluent Kafka running in docker and Kafka connect DataGen to generate a stream of data required for the demo.


  • Download and install docker for your platform. Click here for instructions
  • Google Cloud Account for Cloud Storage. If you don’t have a Google Cloud Account signup by navigating to here

Create GCS Bucket

  • Login into Google Cloud Account and search for cloud storage in Google Cloud Console

  • Create bucket with default configuration. The bucket name must be globally unique.

  • Search for IAM in Google Cloud Console, click service account to create a new service account which will be used by Kafka connect

  • Create service account, grant editor role and select done

  • After creating the service account, click action –> select manage keys –> add key to create a new key of type json. This will also download a json file which will be used in the Kafka connect

Start containers

For the purpose of the demo we will be running Kafka and Kafka connect in a docker container

  • Clone kafka-examples repo

  • Open a new terminal and cd into kafka-to-gcs directory. This directory contains infra components for this demo

    cd kafka-to-gcs
  • Create a copy of .env.template as .env. This contains all environment variables for docker, but we don’t have any variables which should be changed for the purpose of this demo

  • Rename the downloaded gcp key to and copy into kafka-to-gcs/kafka-connect/secrets/. This contains the credentials to write to the GCS bucket.

  • Review the Data generator Kafka source connector config in kafka-connect/config/source/connector_src_users.config. The below configuration will generate sample user data and will write to a topic named user. See here if you wish to customize any of the configuration.

      "name": "src-datagen-users",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "users",
        "quickstart": "users",
        "key.converter": "",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "max.interval": 1000,
        "iterations": 10000000,
        "tasks.max": "1"
  • Review the GCP sink connector config in kafka-connect/config/sink/connector_sink_users.config. The below configuration will generate read data from user topic and will write them to gcp bucket in gzip format. See here if you wish to customize any of the configuration.

    	"name": "sink-gcs-users",
    	"config": {
    		"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max": "1",
    		"topics": "users",
    		"": "entech-demo-bucket-01",
    		"gcs.credentials.path": "/opt/confluent/secrets/connect-secrets.json",
    		"format.output.type": "jsonl",
    		"format.output.envelope": false,
    		"format.output.fields": "value",
    		"_format.output.fields": "key,value,offset,timestamp,headers",
    		"": "raw/",
    		"file.compression.type": "gzip",
    		"": "UTC",
    		"": "wallclock",
    		"": "y={{timestamp:unit=yyyy}}/m={{timestamp:unit=MM}}/d={{timestamp:unit=dd}}/H={{timestamp:unit=HH}}/{{topic}}-{{partition:padding=true}}-{{start_offset:padding=true}}.gz",
    		"key.converter": "",
    		"_key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    		"key.converter.schemas.enable": false,
    		"value.converter": "org.apache.kafka.connect.json.JsonConverter",
    		"_value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    		"__value.converter": "",
    		"value.converter.schemas.enable": false,
    		"errors.tolerance": "all",
    		"": "connect.sink.dlt.gcs.users",
    		"errors.log.include.messages": true,
    		"errors.deadletterqueue.context.headers.enable": true,
    		"errors.deadletterqueue.topic.replication.factor": 1
  • Start Kafka container by running

    docker-compose up -d --build

Validate containers

  • Validate the containers by running

    docker ps
  • Validate the health of cluster and connectors by navigating to Confluent control center UI. Control center is not required for the purpose of this demo, but it’s good to have a dashboard which shows the health of the services

Validate data

  • Validate the data in GCS by navigating to the gcs bucket

  • You should see the data in Kafka topics written to the bucket with a file name format specified in the sink connector configuration

Clean Demo Resources

Open a new terminal and cd into kafka-to-gcs directory. Run the below command to delete the docker containers and related volumes

docker-compose down -v --remove-orphans

Hope this was helpful. Did I miss something ? Let me know in the comments OR in the forum section.


Share this blog:

Related Articles