Integrating Kafka Connect With Amazon Managed Streaming for Apache Kafka (MSK)
In this article we will see how to integrate Kafka connect with Amazon Managed Streaming for Apache Kafka (MSK). Code …
In this article we will see how to stream data from kafka to snowflake using S3 and Snowpipe. Code used in this article can be found here. Clone the repo to get started. To have stream of data, we will source data from Mockaroo.
Create an s3 bucket which acts as the staging area for incoming events. To automate the bucket creation and IAM provisioning we will use a cloudformation template. You can skip this step, if you already have a s3 bucket and IAM user, role and policy required to read and write data from S3.
Create stack
> click Template is ready
> click Choose file
to upload the cloudformation template s3-bucket-and-iam-user.yamlView in Designer
to verify the components which will be created. Here we are creatingClick next
> Give a Stack name (Stack name is bucket name as well) and your IAM User ARN (Temporary, we will change it after we create snowpipe) > click Next
> click Next
> click Create Stack
after reviewing the information.
After few minutes you should see CREATE_COMPLETE
message. Click output to view the access details for your bucket. Note it down as we would need it for next few steps
The docker-compose.yml
will bring up following services
Here is the list of steps which should be done before bringing up the docker services
Create a copy of /kafka-s3-snowpipe/kafka-connect/secrets/connect-secrets.properties.template
as /kafka-s3-snowpipe/kafka-connect/secrets/connect-secrets.properties
and update with S3 details from the cloudformation output.
Create a copy of .env.template
as .env
and update it with Mockaroo Key from the Mockaroo schema which you have created.
Update create-s3-sink.sh
if you have to change the topic name OR the target s3 bucket directory.
Now we have all required artifacts and the next step to start the Kafka services. Here we will bring up Kafka, produce some data by calling the Mockaroo API in a loop and write the events from kafka topic to s3 bucket using s3 sink connector. All these will happen automatically when we bring up docker-compose.yml
Start the application by running docker-compose up --remove-orphans -d --build
in the directory with docker-compose.yml
Validate the status of docker containers by running docker-compose ps
Validate Kafka topics and connector by navigating to control center UI http://your-machine-name:9021/clusters/. We should see data in Kafka topic and S3.
Data in Kafka topic
We will create snowpipe and connect with s3 events to enable auto load of data every time we have new events in our s3 staging area. Snowpipe is Snowflake’s continuous data ingestion service. Snowpipe loads data within minutes after files are added to a stage and submitted for ingestion.
Create the demo database
USE ROLE SYSADMIN;
CREATE OR REPLACE DATABASE DEMO_DB;
Create Schema
CREATE OR REPLACE SCHEMA DEMO_DB.SNOWPIPE;
Create storage integration. Update STORAGE_AWS_ROLE_ARN
using the output from cloudformation and STORAGE_ALLOWED_LOCATIONS
with s3 bucket details
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE STORAGE INTEGRATION MOCKAROO_S3_INT
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'iam-role'
STORAGE_ALLOWED_LOCATIONS = ('s3://entechlog-demo/kafka-snowpipe-demo/');
Describe Integration and retrieve the AWS IAM User (STORAGE_AWS_IAM_USER_ARN
and STORAGE_AWS_EXTERNAL_ID
) for Snowflake Account
DESC INTEGRATION MOCKAROO_S3_INT;
Grant the IAM user permissions to access S3 Bucket. Navigate to IAM in AWS console, click Roles
> click your role name from cloudformation output > click Trust relationships
> click Edit trust relationship
> Update the Principal
with STORAGE_AWS_IAM_USER_ARN
and sts:ExternalId
with STORAGE_AWS_EXTERNAL_ID
> click Update Trust Policy
. Now we have replaced the temporary details which we gave when creating the role in cloudformation with the correct snowflake account and external ID.
Create file format for incoming files
CREATE OR REPLACE FILE FORMAT DEMO_DB.SNOWPIPE.MOCKAROO_FILE_FORMAT
TYPE = JSON COMPRESSION = AUTO TRIM_SPACE = TRUE NULL_IF = ('NULL', 'NULL');
Create stage for incoming files. Update URL
with s3 bucket details
CREATE OR REPLACE STAGE DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG
STORAGE_INTEGRATION = MOCKAROO_S3_INT
URL = 's3://entechlog-demo/kafka-snowpipe-demo/'
FILE_FORMAT = MOCKAROO_FILE_FORMAT;
Create target table for JSON data
CREATE OR REPLACE TABLE DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL (
"event" VARIANT
);
Create snowpipe to ingest data from STAGE
to TABLE
CREATE
OR REPLACE PIPE DEMO_DB.SNOWPIPE.MOCKAROO_SNOWPIPE AUTO_INGEST = TRUE AS COPY
INTO DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL
FROM @DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG;
Describe snowpipe and copy the ARN for notification_channel
SHOW PIPES LIKE '%MOCKAROO_SNOWPIPE%';
Now is the time to connect S3 and snowpipe. Navigate to Amazon S3 > click your bucket > click Properties
> click Create event notification
> Update Event name, Prefix and Suffix
> Set Event types
as All object create events
> Update Destination
as SQS queue
and click Enter SQS queue ARN
from the snowpipe
Validate the snowpipe status
SELECT SYSTEM$PIPE_STATUS('DEMO_DB.SNOWPIPE.MOCKAROO_SNOWPIPE');
Validate data in snowflake
SELECT * FROM DEMO_DB.SNOWPIPE.MOCKAROO_RAW_TBL;
This data in RAW tables can be further processed by streams and tasks enabling end to end stream processing.
We could also create a parsed table and load it directly from a snowpipe.
-- Create parsed table
CREATE OR REPLACE TABLE DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL (
"log_ts" TIMESTAMP,
"path_name" STRING,
"file_name" STRING,
"file_row_number" STRING,
"id" STRING,
"first_name" STRING,
"last_name" STRING,
"gender" STRING,
"company_name" STRING,
"job_title" STRING,
"slogan" STRING,
"email" STRING
);
-- Create snowpipe to load parsed table
CREATE PIPE DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_SNOWPIPE AUTO_INGEST = TRUE AS COPY
INTO DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL
FROM (
SELECT current_timestamp::TIMESTAMP log_ts
,left(metadata$filename, 77) path_name
,regexp_replace(metadata$filename, '.*\/(.*)', '\\1') file_name
,metadata$file_row_number file_row_number
,$1:id
,$1:first_name
,$1:last_name
,$1:gender
,$1:company_name
,$1:job_title
,$1:slogan
,$1:email
FROM @DEMO_DB.SNOWPIPE.MOCKAROO_S3_STG
);
-- Validate data in parsed table
SELECT * FROM DEMO_DB.SNOWPIPE.MOCKAROO_PARSED_TBL;
Hope this was helpful. Did I miss something ? Let me know in the comments and I’ll add it in !
docker container ls -a
docker-compose down
docker-compose down --volumes
docker rm $(docker ps -q -f status=exited)
In this article we will see how to integrate Kafka connect with Amazon Managed Streaming for Apache Kafka (MSK). Code …
In this article we will see how to use the Snowflake connector to stream data from Kafka to Snowflake. The Snowflake …