preloader
blog-post

How To Stream Data From Kafka To Snowflake Using S3 and Snowpipe

Table of Contents

In this article we will see how to stream data from kafka to snowflake using S3 and Snowpipe. Code used in this article can be found here. Clone the repo to get started. To have stream of data, we will source data from Mockaroo.

Prerequisite

  • Download and install docker for your platform. Click here for instructions
  • Create Snowflake account for the demo. Click here for instructions
  • Create AWS account for the demo. Click here for instructions
  • Create Mockaroo account for the demo and Create a Schema as per your data needs. Click here for instructions.

Create S3 bucket

Create an s3 bucket which acts as the staging area for incoming events. To automate the bucket creation and IAM provisioning we will use a cloudformation template. You can skip this step, if you already have a s3 bucket and IAM user, role and policy required to read and write data from S3.

  • Login into AWS account, Navigate to CloudFormation and click Create stack > click Template is ready > click Choose file to upload the cloudformation template s3-bucket-and-iam-user.yaml
  • Optionally click View in Designer to verify the components which will be created. Here we are creating
    ➼ S3 bucket and bucket policy
    ➼ IAM user and policy for kafka to write the data to s3
    ➼ IAM role and policy for snowflake to read data from snowflake
  • Click next > Give a Stack name (Stack name is bucket name as well) and your IAM User ARN (Temporary, we will change it after we create snowpipe) > click Next > click Next > click Create Stack after reviewing the information.

  • After few minutes you should see CREATE_COMPLETE message. Click output to view the access details for your bucket. Note it down as we would need it for next few steps

Prepare docker-compose

  • The docker-compose.yml will bring up following services

    • Kafka (Zookeeper, Broker, Kafka Connect, Schema Registry, Control Center)
    • Kafka Client (Creates Kafka topic)
    • kafkacat (curl’s Mockaroo and writes data to Kafka topic)
  • Here is the list of steps which should be done before bringing up the docker services

    • Create a copy of /kafka-s3-snowpipe/kafka-connect/secrets/connect-secrets.properties.template as /kafka-s3-snowpipe/kafka-connect/secrets/connect-secrets.properties and update with S3 details from the cloudformation output.

    • Create a copy of .env.template as .env and update it with Mockaroo Key from the Mockaroo schema which you have created.

Update create-s3-sink.sh if you have to change the topic name OR the target s3 bucket directory.

Start and validate Kafka

Now we have all required artifacts and the next step to start the Kafka services. Here we will bring up Kafka, produce some data by calling the Mockaroo API in a loop and write the events from kafka topic to s3 bucket using s3 sink connector. All these will happen automatically when we bring up docker-compose.yml

  • Start the application by running docker-compose up --remove-orphans -d --build in the directory with docker-compose.yml

  • Validate the status of docker containers by running docker-compose ps

  • Validate Kafka topics and connector by navigating to control center UI http://your-machine-name:9021/clusters/. We should see data in Kafka topic and S3.

  • Data in Kafka topic

  • Data in S3 bucket

Create snowpipe

We will create snowpipe and connect with s3 events to enable auto load of data every time we have new events in our s3 staging area. Snowpipe is Snowflake’s continuous data ingestion service. Snowpipe loads data within minutes after files are added to a stage and submitted for ingestion.

  • Create the demo database

    USE ROLE SYSADMIN;
    CREATE OR REPLACE DATABASE DEMO_DB;
    
  • Create Schema

    CREATE OR REPLACE SCHEMA DEMO_DB.SNOWPIPE;
    
  • Create storage integration. Update STORAGE_AWS_ROLE_ARN using the output from cloudformation and STORAGE_ALLOWED_LOCATIONS with s3 bucket details

    USE ROLE ACCOUNTADMIN;
    CREATE OR REPLACE STORAGE INTEGRATION MOCKAROO_S3_INT
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = S3
    ENABLED = TRUE
    STORAGE_AWS_ROLE_ARN = 'iam-role'
    STORAGE_ALLOWED_LOCATIONS = ('s3://entechlog-demo/kafka-snowpipe-demo/');
    
  • Describe Integration and retrieve the AWS IAM User (STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID) for Snowflake Account

    DESC INTEGRATION MOCKAROO_S3_INT;
    
  • Grant the IAM user permissions to access S3 Bucket. Navigate to IAM in AWS console, click Roles > click your role name from cloudformation output > click Trust relationships > click Edit trust relationship > Update the Principal with STORAGE_AWS_IAM_USER_ARN and sts:ExternalId with STORAGE_AWS_EXTERNAL_ID > click Update Trust Policy. Now we have replaced the temporary details which we gave when creating the role in cloudformation with the correct snowflake account and external ID.

  • Create file format for incoming files

    CREATE OR REPLACE FILE FORMAT DEMO_DB.SNOWPIPE.MOCKAROO_FILE_FORMAT
    TYPE = JSON COMPRESSION = AUTO TRIM_SPACE = TRUE NULL_IF = ('NULL', 'NULL');
    
  • Create stage for incoming files. Update URL with s3 bucket details

    CREATE OR REPLACE STAGE DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG
    STORAGE_INTEGRATION = MOCKAROO_S3_INT
    URL = 's3://entechlog-demo/kafka-snowpipe-demo/'
    FILE_FORMAT = MOCKAROO_FILE_FORMAT;
    
  • Create target table for JSON data

    CREATE OR REPLACE TABLE DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL (
             "event" VARIANT
    );
    
  • Create snowpipe to ingest data from STAGE to TABLE

    CREATE
        OR REPLACE PIPE DEMO_DB.SNOWPIPE.MOCKAROO_SNOWPIPE AUTO_INGEST = TRUE AS COPY
    INTO DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL
    FROM @DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG;
    
  • Describe snowpipe and copy the ARN for notification_channel

    SHOW PIPES LIKE '%MOCKAROO_SNOWPIPE%';
    
  • Now is the time to connect S3 and snowpipe. Navigate to Amazon S3 > click your bucket > click Properties > click Create event notification > Update Event name, Prefix and Suffix > Set Event types as All object create events > Update Destination as SQS queue and click Enter SQS queue ARN from the snowpipe

  • Validate the snowpipe status

    SELECT SYSTEM$PIPE_STATUS('DEMO_DB.SNOWPIPE.MOCKAROO_SNOWPIPE');
    
  • Validate data in snowflake

    SELECT * FROM DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL;
    

This data in RAW tables can be further processed by streams and tasks enabling end to end stream processing.

We could also create a parsed table and load it directly from a snowpipe.

-- Create parsed table
CREATE OR REPLACE TABLE DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL (
         "log_ts" TIMESTAMP,
         "path_name" STRING,
         "file_name" STRING,
		 "file_row_number" STRING,
		 "id" STRING,
		 "first_name" STRING,
		 "last_name" STRING,
         "gender" STRING,
		 "company_name" STRING,
		 "job_title" STRING,
		 "slogan" STRING,
		 "email" STRING
);

-- Create snowpipe to load parsed table
CREATE PIPE DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_SNOWPIPE AUTO_INGEST = TRUE AS COPY
INTO DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL
FROM (
	SELECT current_timestamp::TIMESTAMP log_ts
		,left(metadata$filename, 77) path_name
		,regexp_replace(metadata$filename, '.*\/(.*)', '\\1') file_name
		,metadata$file_row_number file_row_number
		,$1:id
		,$1:first_name
		,$1:last_name
		,$1:gender
		,$1:company_name
		,$1:job_title
		,$1:slogan
		,$1:email
	FROM @DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG
	);

-- Validate data in parsed table
SELECT * FROM DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL;

Hope this was helpful. Did I miss something ? Let me know in the comments and I’ll add it in !

Notes

  • You can see the list of all containers by running docker container ls -a
  • You can bring down the containers by running docker-compose down
  • You can bring down the containers and related volumes by running docker-compose down --volumes
  • You can delete all exited containers by running docker rm $(docker ps -q -f status=exited)

References

Share this blog:
Comments

Related Articles