
Change Data Capture(CDC) in ksqlDB
In this article we will go over steps to implement Change Data Capture(CDC) in ksqlDB. See blow diagram for high level …
If you have worked on ksqlDB, you would have come across this scenario multiple times. To drop a ksqlDB tables OR streams we need to identify all related queries, terminate them and there is no easy process to do it if you have multiple chained ksql processes. To ease this, I have created a script which will accept a list of table/stream names and will take care of the termination and drop activities.
See GitHub for the source code for this script at ksqldb-utilities repo
Let’s validate the script by creating some test streams and tables
Create a schema for your datagen, See https://github.com/entechlog/ksqlDB-datagen/blob/master/ordersForArrayDemo.avro for the schema I have created to generate test data for this demo.
cd into the directory which has the schema (in this case ordersForArrayDemo.avro) and issue below command. This will generate test data for our topic. In this example its random orders data for few customers
ksql-datagen schema=ordersForArrayDemo.avro format=avro topic='dev.etl.datagen.orders.src.0010' key=orderID maxInterval=5000 iterations=10`
Start KSQL and set the offset to read from beginning
SET 'auto.offset.reset'='earliest';
Check messages in topic using
PRINT 'dev.etl.datagen.orders.src.0010' FROM BEGINNING LIMIT 10;
Create the streams, tables for this demo
CREATE STREAM STM_DEMO_DROP_KSQL_OBJECT_0010
WITH (KAFKA_TOPIC = 'dev.etl.datagen.orders.src.0010', VALUE_FORMAT = 'Avro');
CREATE STREAM STM_DEMO_DROP_KSQL_OBJECT_0020 AS
SELECT CUSTOMERID, AS_MAP(AS_ARRAY('orderID', 'StorePickUpFlag', 'ItemName', 'OrderDate'), AS_ARRAY(ORDERID, STOREPICKUPFLAG, ITEMNAME, ORDERDATE)) AS orderData
FROM STM_DEMO_DROP_KSQL_OBJECT_0010 PARTITION BY CUSTOMERID;
CREATE TABLE TBL_DEMO_DROP_KSQL_OBJECT_0030 AS
SELECT CUSTOMERID, COLLECT_LIST(CAST(ORDERDATA AS VARCHAR))
FROM STM_DEMO_DROP_KSQL_OBJECT_0020
GROUP BY CUSTOMERID EMIT CHANGES;
Now let’s create the control file which has all the streams and tables which we need to drop. I have listed them in the same order as how I would have dropped them manually, That would be Last In, First Out (LIFO). Optionally you can also specify delete topic flag, yes and no are the valid options.
TBL_DEMO_DROP_KSQL_OBJECT_0030,yes,no
STM_DEMO_DROP_KSQL_OBJECT_0020,yes,no
STM_DEMO_DROP_KSQL_OBJECT_0010,no,yes
Run the script to terminate, delete streams and tables
./dropKqlDBObjects.sh sed localhost:8088 /input/DEMO_DROP_KSQL_OBJECT.txt
Here is a sample output from the script
**************************************************************************
SCRIPT_NAME : dropKqlDBObjects
**************************************************************************
INFO : logs directory exists
INFO : temp directory exists
INFO : reports directory exists
INFO : Total arguments.. 3
INFO : Received the correct number of input parameters
INFO-MODE : sed
INFO-KSQL_URL : localhost:8088
INFO-FILE_NAME : /opt/confluent/scripts/dropkqlobjects/input/DEMO_DROP_KSQL_OBJECT.txt
INFO-LOGDIR : /opt/confluent/scripts/dropkqlobjects/logs
INFO-TEMPDIR : /opt/confluent/scripts/dropkqlobjects/temp
INFO-REPORTSDIR : /opt/confluent/scripts/dropkqlobjects/reports
INFO-INPUTDIR : /opt/confluent/scripts/dropkqlobjects/input
Successful completion of this script will terminate all queries related to table/stream, drop the table/stream and also delete the topics based on delete topic flag in input file.
logs directory will have an entry for this run with more detailed log
reports directory will have a html file with summary of this execution
If you find this useful, clone the git repo and start using it. If you wish to add additional features, become a contributor and start enhancing.
Error bash: ./dropKqlObjects.sh: /bin/sh^M: bad interpreter: No such file or directory
Solution
sed -i -e 's/\r$//' dropKqlObjects.sh
Error ./dropKqlDBObjects.sh: Bad substitution
Solution Change the first line of code to #!/bin/sh OR #!/bin/bash based on your operating system
In this article we will go over steps to implement Change Data Capture(CDC) in ksqlDB. See blow diagram for high level …
In this article we will go over few scenarios and to explore ARRAY, MAP and LIST in ksqlDB Prerequisites Install …