Change Data Capture(CDC) in ksqlDB

  • Home
  • /
  • Change Data Capture(CDC) in ksqlDB
Change Data Capture(CDC) in ksqlDB

Change Data Capture(CDC) in ksqlDB

Confluent Kafka Ksql d b 23 Mar 2020

In this article we will go over to steps to implement Change Data Capture(CDC) in ksqlDB. See blow diagram for high level logical flow of this implementation.

post thumb
Prerequisites

Install Confluent platform for your operating system using instructions at https://docs.confluent.io/current/installation/installing_cp/index.html

Steps
  • Create a STREAM with no key
CREATE STREAM STM_DEPARTMENTS_RAW (DEPT_NO VARCHAR, DEPT_NAME VARCHAR)
WITH (KAFKA_TOPIC='STM_DEPARTMENTS_RAW',VALUE_FORMAT='DELIMITED', PARTITIONS=1, REPLICAS=1);
  • Create a STREAM to add a key
CREATE STREAM STM_DEPARTMENTS_WITH_KEY AS 
SELECT * FROM STM_DEPARTMENTS_RAW PARTITION BY DEPT_NO;
  • Create a TABLE from the STREAM, This will hold the current record. On most uses cases this is your first and this table is your current data set.
CREATE TABLE TBL_DEPARTMENTS_SRC (DEPT_NO VARCHAR, DEPT_NAME VARCHAR) 
WITH (KAFKA_TOPIC='STM_DEPARTMENTS_WITH_KEY',VALUE_FORMAT='DELIMITED', KEY='DEPT_NO');
  • Insert few records in our source STREAM
INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d001', 'Marketing');
INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d002', 'Claims');
INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d003', 'Accounting');
INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d004', 'Information Technology');
  • Create a TABLE, This table will hold previous record
-- Create Target Stream
CREATE STREAM STM_DEPARTMENTS_TGT (HUMAN_TS VARCHAR, DEPT_NO VARCHAR, CURR_DEPT_NAME VARCHAR, PREV_DEPT_NAME VARCHAR)
	WITH (KAFKA_TOPIC = 'STM_DEPARTMENTS_TGT', VALUE_FORMAT = 'AVRO', PARTITIONS = 1, REPLICAS = 1);

-- Create table which will act as previous table for CDC
CREATE TABLE TBL_DEPARTMENTS_PREV AS

SELECT DEPT_NO, UDF_LATEST_BY_OFFSET(HUMAN_TS) AS HUMAN_TS, UDF_LATEST_BY_OFFSET(CURR_DEPT_NAME) AS CURR_DEPT_NAME, UDF_LATEST_BY_OFFSET(PREV_DEPT_NAME) AS PREV_DEPT_NAME
FROM STM_DEPARTMENTS_TGT
GROUP BY DEPT_NO;
  • Implement the logic to do CDC. Here most of the logic will go in the WHERE clause of the query.
-- Do CDC and create a CDC table, Here we are creating table since the result is a table, Else we could have just used a stream insert here
CREATE TABLE TBL_DEPARTMENTS_CDC AS

SELECT TIMESTAMPTOSTRING(CURR.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS HUMAN_TS, CURR.DEPT_NO AS DEPT_NO, CURR.DEPT_NAME AS CURR_DEPT_NAME, PREV.CURR_DEPT_NAME AS PREV_DEPT_NAME
FROM TBL_DEPARTMENTS_SRC CURR
LEFT JOIN TBL_DEPARTMENTS_PREV PREV ON CURR.ROWKEY = PREV.ROWKEY
WHERE TRIM(UCASE(IFNULL(CURR.DEPT_NAME, ''))) <> TRIM(UCASE(IFNULL(PREV.CURR_DEPT_NAME, ''))) EMIT CHANGES;

-- Create CDC stream on top of CDC topic
CREATE STREAM STM_DEPARTMENTS_CDC (HUMAN_TS VARCHAR, DEPT_NO VARCHAR, CURR_DEPT_NAME VARCHAR, PREV_DEPT_NAME VARCHAR)
	WITH (VALUE_FORMAT = 'DELIMITED', KAFKA_TOPIC = 'TBL_DEPARTMENTS_CDC');
  • Copy the data from CDC STREAM to target STREAM
-- Insert into Target Stream
INSERT INTO STM_DEPARTMENTS_TGT
SELECT *
FROM STM_DEPARTMENTS_CDC EMIT CHANGES;
  • Now lets inspect a new record into source stream and validate the result
-- Insert one more record in source stream and inspect the result
INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d001', 'Marketing Team');
ksql> select * from STM_DEPARTMENTS_TGT emit changes;
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|ROWTIME                   |ROWKEY                    |HUMAN_TS                  |DEPT_NO                   |CURR_DEPT_NAME            |PREV_DEPT_NAME            |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|1587515912933             |d001                      |2020-04-21 20:38:32 -0400 |d001                      |Marketing                 |null                      |
|1587515912955             |d002                      |2020-04-21 20:38:32 -0400 |d002                      |Claims                    |null                      |
|1587515912969             |d003                      |2020-04-21 20:38:32 -0400 |d003                      |Accounting                |null                      |
|1587515912980             |d004                      |2020-04-21 20:38:32 -0400 |d004                      |Information Technology    |null                      |
|1587521602667             |d001                      |2020-04-21 22:13:22 -0400 |d001                      |Marketing Team            |Marketing                 |

As you see in previous step its creating a new record into stream, Now lets insert one more record into source stream and this time we won’t change the data. This should not impact our target stream.

-- Insert one more record in source stream and inspect the result
INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d002', 'Claims');

SELECT * FROM STM_DEPARTMENTS_TGT EMIT CHANGES;
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|ROWTIME                   |ROWKEY                    |HUMAN_TS                  |DEPT_NO                   |CURR_DEPT_NAME            |PREV_DEPT_NAME            |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|1587515912933             |d001                      |2020-04-21 20:38:32 -0400 |d001                      |Marketing                 |null                      |
|1587515912955             |d002                      |2020-04-21 20:38:32 -0400 |d002                      |Claims                    |null                      |
|1587515912969             |d003                      |2020-04-21 20:38:32 -0400 |d003                      |Accounting                |null                      |
|1587515912980             |d004                      |2020-04-21 20:38:32 -0400 |d004                      |Information Technology    |null                      |
|1587521602667             |d001                      |2020-04-21 22:13:22 -0400 |d001                      |Marketing Team            |Marketing                 |

New insert did not trigger any changes to the target stream.

Share: