preloader
blog-post

Snowflake Connector For Kafka

Table of Contents

In this article we will see how to use the Snowflake connector to stream data from Kafka to Snowflake. The Snowflake Connector for Kafka (“Kafka connector”) reads data from one or more Apache Kafka topics and loads the data into a Snowflake table. Code used in this article can be found at here. To have a stream of data for this demo, we will source data from twitter.

Prerequisite

  • Download and install docker for your platform. See here for instructions
  • Create a Snowflake account for the demo. See here for instructions
  • Get twitter API key. See here for instructions

Generate Key Pair

Before the connector can sink data to Snowflake, you need to generate a key pair. Snowflake authentication requires 2048-bit (minimum) RSA. You add the public key to a Snowflake user account. You add the private key to the connector configuration. Generate key pair in the machine from which we are going to execute the steps.

  • Generate a private key using OpenSSL

    openssl genrsa -out snowflake_key.pem 2048
    
  • Generate the public key referencing the private key

    openssl rsa -in snowflake_key.pem  -pubout -out snowflake_key.pub
    
  • Get the required part of public key

    grep -v "BEGIN PUBLIC" snowflake_key.pub | grep -v "END PUBLIC"|tr -d '\r\n'`
    
  • Get the required part of private key

    grep -v "BEGIN RSA PRIVATE KEY" snowflake_key.pem | grep -v "END RSA PRIVATE KEY"|tr -d '\r\n'`
    

Create User in Snowflake

Login into your Snowflake account and execute the following queries to create a user account and add the public key to this account

  • Switch to the SECURITYADMIN role

    USE ROLE SECURITYADMIN;
    
  • Create Kafka user in Snowflake

    CREATE USER kafka RSA_PUBLIC_KEY='<your-public-key>';
    
  • Assign role for Kafka user

    GRANT ROLE SYSADMIN TO USER kafka;
    

Start Kafka and Connectors

  • Download docker-compose.yml and all required components from here

  • Create a copy of kafka-to-snowflake\kafka-connect\secrets\connect-secrets.properties.template as kafka-to-snowflake\kafka-connect\secrets\connect-secrets.properties and update it with the Twitter API key details and Snowflake credentials

  • Start the services in docker. This will bring up the Kafka cluster and Kafka connectors automatically by running create-twitter-source.sh and create-snowflake-sink.sh

    docker-compose up -d
    
  • Validate all services

    docker ps 
    
  • Navigate to Control Center in http://localhost:9021/clusters and check the status of connectors. We should have two connectors running, one to source data from Twitter and other to sink the data to Snowflake.

Validate Data In Snowflake

  • Navigate to Snowflake UI for your account. We should see the new table in DEMO_DB database, PUBLIC schema

  • Check few random records by running

    SELECT *
    FROM "DEMO_DB"."PUBLIC"."TWITTER_MY_FAVORITE_CELEBRITIES_SRC_1206524140"
    LIMIT 10;
    
  • Check record count by running. The counts should change frequently if you rerun this query as we are streaming data.

    SELECT CURRENT_TIMESTAMP()
    	,COUNT(*)
    FROM "DEMO_DB"."PUBLIC"."TWITTER_MY_FAVORITE_CELEBRITIES_SRC_1206524140";
    
  • Parse the JSON documents in Snowflake, Here is an example to select few columns from the JSON document.

    -- Parse JSON
    -- Field in root    : RECORD_CONTENT:CreatedAt
    -- Field in Dict    : RECORD_CONTENT:User.Name
    -- Field in Array   : RECORD_CONTENT:HashtagEntities (VALUE:Text::STRING)
    SELECT
    RECORD_CONTENT:CreatedAt AS "CREATEDAT",
    RECORD_CONTENT:User.Name AS "USERNAME",
    VALUE:Text::STRING AS "HASHTAG"
    FROM "DEMO_DB"."PUBLIC"."TWITTER_MY_FAVORITE_CELEBRITIES_SRC_1206524140",
    LATERAL FLATTEN(INPUT => RECORD_CONTENT:HashtagEntities)
    WHERE
    RECORD_CONTENT: HashtagEntities <> '[]';
    

Hope this was helpful. Did I miss something ? Let me know in the comments and I’ll add it in !

Notes

  • You can see the list of all containers by running docker container ls -a
  • You can bring down the containers by running docker-compose down
  • You can bring down the containers and related volumes by running docker-compose down --volumes
  • You can delete all exited containers by running docker rm $(docker ps -q -f status=exited)

References

Share this blog:
Comments

Related Articles