In this article we will see how to integrate Kafka connect with Amazon Managed Streaming for Apache Kafka (MSK). Code …
How To Stream Data From Kafka To Snowflake Using S3 and Snowpipe
In this article we will see how to stream data from kafka to snowflake using S3 and Snowpipe. Code used in this article can be found here. Clone the repo to get started. To have stream of data, we will source data from Mockaroo.
- Download and install docker for your platform. Click here for instructions
- Create Snowflake account for the demo. Click here for instructions
- Create AWS account for the demo. Click here for instructions
- Create Mockaroo account for the demo and Create a Schema as per your data needs. Click here for instructions.
Create S3 bucket
Create an s3 bucket which acts as the staging area for incoming events. To automate the bucket creation and IAM provisioning we will use a cloudformation template. You can skip this step, if you already have a s3 bucket and IAM user, role and policy required to read and write data from S3.
- Login into AWS account, Navigate to CloudFormation and click
Create stack> click
Template is ready> click
Choose fileto upload the cloudformation template s3-bucket-and-iam-user.yaml
- Optionally click
View in Designerto verify the components which will be created. Here we are creating
➼ S3 bucket and bucket policy
➼ IAM user and policy for kafka to write the data to s3
➼ IAM role and policy for snowflake to read data from snowflake
next> Give a Stack name (Stack name is bucket name as well) and your IAM User ARN (Temporary, we will change it after we create snowpipe) > click
Create Stackafter reviewing the information.
After few minutes you should see
CREATE_COMPLETEmessage. Click output to view the access details for your bucket. Note it down as we would need it for next few steps
docker-compose.ymlwill bring up following services
- Kafka (Zookeeper, Broker, Kafka Connect, Schema Registry, Control Center)
- Kafka Client (Creates Kafka topic)
- kafkacat (curl’s Mockaroo and writes data to Kafka topic)
Here is the list of steps which should be done before bringing up the docker services
Create a copy of
/kafka-s3-snowpipe/kafka-connect/secrets/connect-secrets.propertiesand update with S3 details from the cloudformation output.
Create a copy of
.envand update it with Mockaroo Key from the Mockaroo schema which you have created.
create-s3-sink.sh if you have to change the topic name OR the target s3 bucket directory.
Start and validate Kafka
Now we have all required artifacts and the next step to start the Kafka services. Here we will bring up Kafka, produce some data by calling the Mockaroo API in a loop and write the events from kafka topic to s3 bucket using s3 sink connector. All these will happen automatically when we bring up
Start the application by running
docker-compose up --remove-orphans -d --buildin the directory with docker-compose.yml
Validate the status of docker containers by running
Validate Kafka topics and connector by navigating to control center UI http://your-machine-name:9021/clusters/. We should see data in Kafka topic and S3.
Data in Kafka topic
- Data in S3 bucket
We will create snowpipe and connect with s3 events to enable auto load of data every time we have new events in our s3 staging area. Snowpipe is Snowflake’s continuous data ingestion service. Snowpipe loads data within minutes after files are added to a stage and submitted for ingestion.
Create the demo database
USE ROLE SYSADMIN; CREATE OR REPLACE DATABASE DEMO_DB;
CREATE OR REPLACE SCHEMA DEMO_DB.SNOWPIPE;
Create storage integration. Update
STORAGE_AWS_ROLE_ARNusing the output from cloudformation and
STORAGE_ALLOWED_LOCATIONSwith s3 bucket details
USE ROLE ACCOUNTADMIN; CREATE OR REPLACE STORAGE INTEGRATION MOCKAROO_S3_INT TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = 'iam-role' STORAGE_ALLOWED_LOCATIONS = ('s3://entechlog-demo/kafka-snowpipe-demo/');
Describe Integration and retrieve the AWS IAM User (
STORAGE_AWS_EXTERNAL_ID) for Snowflake Account
DESC INTEGRATION MOCKAROO_S3_INT;
Grant the IAM user permissions to access S3 Bucket. Navigate to IAM in AWS console, click
Roles> click your role name from cloudformation output > click
Trust relationships> click
Edit trust relationship> Update the
Update Trust Policy. Now we have replaced the temporary details which we gave when creating the role in cloudformation with the correct snowflake account and external ID.
Create file format for incoming files
CREATE OR REPLACE FILE FORMAT DEMO_DB.SNOWPIPE.MOCKAROO_FILE_FORMAT TYPE = JSON COMPRESSION = AUTO TRIM_SPACE = TRUE NULL_IF = ('NULL', 'NULL');
Create stage for incoming files. Update
URLwith s3 bucket details
CREATE OR REPLACE STAGE DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG STORAGE_INTEGRATION = MOCKAROO_S3_INT URL = 's3://entechlog-demo/kafka-snowpipe-demo/' FILE_FORMAT = MOCKAROO_FILE_FORMAT;
Create target table for JSON data
CREATE OR REPLACE TABLE DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL ( "event" VARIANT );
Create snowpipe to ingest data from
CREATE OR REPLACE PIPE DEMO_DB.SNOWPIPE.MOCKAROO_SNOWPIPE AUTO_INGEST = TRUE AS COPY INTO DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL FROM @DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG;
Describe snowpipe and copy the ARN for notification_channel
SHOW PIPES LIKE '%MOCKAROO_SNOWPIPE%';
Now is the time to connect S3 and snowpipe. Navigate to Amazon S3 > click your bucket > click
Create event notification> Update
Event name, Prefix and Suffix> Set
All object create events> Update
SQS queueand click
Enter SQS queue ARNfrom the snowpipe
Validate the snowpipe status
Validate data in snowflake
SELECT * FROM DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL;
This data in RAW tables can be further processed by streams and tasks enabling end to end stream processing.
We could also create a parsed table and load it directly from a snowpipe.
-- Create parsed table CREATE OR REPLACE TABLE DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL ( "log_ts" TIMESTAMP, "path_name" STRING, "file_name" STRING, "file_row_number" STRING, "id" STRING, "first_name" STRING, "last_name" STRING, "gender" STRING, "company_name" STRING, "job_title" STRING, "slogan" STRING, "email" STRING ); -- Create snowpipe to load parsed table CREATE PIPE DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_SNOWPIPE AUTO_INGEST = TRUE AS COPY INTO DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL FROM ( SELECT current_timestamp::TIMESTAMP log_ts ,left(metadata$filename, 77) path_name ,regexp_replace(metadata$filename, '.*\/(.*)', '\\1') file_name ,metadata$file_row_number file_row_number ,$1:id ,$1:first_name ,$1:last_name ,$1:gender ,$1:company_name ,$1:job_title ,$1:slogan ,$1:email FROM @DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG ); -- Validate data in parsed table SELECT * FROM DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL;
Hope this was helpful. Did I miss something ? Let me know in the comments and I’ll add it in !
- You can see the list of all containers by running
docker container ls -a
- You can bring down the containers by running
- You can bring down the containers and related volumes by running
docker-compose down --volumes
- You can delete all exited containers by running
docker rm $(docker ps -q -f status=exited)
In this article we will see how to use the Snowflake connector to stream data from Kafka to Snowflake. The Snowflake …