preloader
blog-post

Change Data Capture(CDC) in ksqlDB

Table of Contents

In this article we will go over steps to implement Change Data Capture(CDC) in ksqlDB. See blow diagram for high level logical flow of this implementation.

Prerequisites

Install Confluent platform for your operating system using instructions at https://docs.confluent.io/current/installation/installing_cp/index.html

Steps

  • Create a STREAM with no key

    CREATE STREAM STM_DEPARTMENTS_RAW (DEPT_NO VARCHAR, DEPT_NAME VARCHAR)
    WITH (KAFKA_TOPIC='STM_DEPARTMENTS_RAW',VALUE_FORMAT='DELIMITED', PARTITIONS=1, REPLICAS=1);
    
  • Create a STREAM to add a key

    CREATE STREAM STM_DEPARTMENTS_WITH_KEY AS 
    SELECT * FROM STM_DEPARTMENTS_RAW PARTITION BY DEPT_NO;
    
  • Create a TABLE from the STREAM, This will hold the current record. On most uses cases this is your first and this table is your current data set

    CREATE TABLE TBL_DEPARTMENTS_SRC (DEPT_NO VARCHAR, DEPT_NAME VARCHAR) 
    WITH (KAFKA_TOPIC='STM_DEPARTMENTS_WITH_KEY',VALUE_FORMAT='DELIMITED', KEY='DEPT_NO');
    
  • Insert few records in our source STREAM

    INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d001', 'Marketing');
    INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d002', 'Claims');
    INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d003', 'Accounting');
    INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d004', 'Information Technology');
    
  • Create a TABLE, This table will hold previous record

    -- Create Target Stream
    CREATE STREAM STM_DEPARTMENTS_TGT (HUMAN_TS VARCHAR, DEPT_NO VARCHAR, CURR_DEPT_NAME VARCHAR, PREV_DEPT_NAME VARCHAR)
    	WITH (KAFKA_TOPIC = 'STM_DEPARTMENTS_TGT', VALUE_FORMAT = 'AVRO', PARTITIONS = 1, REPLICAS = 1);
    
    -- Create table which will act as previous table for CDC
    CREATE TABLE TBL_DEPARTMENTS_PREV AS
    
    SELECT DEPT_NO, UDF_LATEST_BY_OFFSET(HUMAN_TS) AS HUMAN_TS, UDF_LATEST_BY_OFFSET(CURR_DEPT_NAME) AS CURR_DEPT_NAME, UDF_LATEST_BY_OFFSET(PREV_DEPT_NAME) AS PREV_DEPT_NAME
    FROM STM_DEPARTMENTS_TGT
    GROUP BY DEPT_NO;
    
  • Implement the logic to do CDC. Here most of the logic will go in the WHERE clause of the query

    -- Do CDC and create a CDC table, Here we are creating table since the result is a table, Else we could have just used a stream insert here
    CREATE TABLE TBL_DEPARTMENTS_CDC AS
    
    SELECT TIMESTAMPTOSTRING(CURR.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS HUMAN_TS, CURR.DEPT_NO AS DEPT_NO, CURR.DEPT_NAME AS CURR_DEPT_NAME, PREV.CURR_DEPT_NAME AS PREV_DEPT_NAME
    FROM TBL_DEPARTMENTS_SRC CURR
    LEFT JOIN TBL_DEPARTMENTS_PREV PREV ON CURR.ROWKEY = PREV.ROWKEY
    WHERE TRIM(UCASE(IFNULL(CURR.DEPT_NAME, ''))) <> TRIM(UCASE(IFNULL(PREV.CURR_DEPT_NAME, ''))) EMIT CHANGES;
    
    -- Create CDC stream on top of CDC topic
    CREATE STREAM STM_DEPARTMENTS_CDC (HUMAN_TS VARCHAR, DEPT_NO VARCHAR, CURR_DEPT_NAME VARCHAR, PREV_DEPT_NAME VARCHAR)
    	WITH (VALUE_FORMAT = 'DELIMITED', KAFKA_TOPIC = 'TBL_DEPARTMENTS_CDC');
    
  • Copy the data from CDC STREAM to target STREAM

    -- Insert into Target Stream
    INSERT INTO STM_DEPARTMENTS_TGT
    SELECT *
    FROM STM_DEPARTMENTS_CDC EMIT CHANGES;
    
  • Now lets inspect a new record into source stream and validate the result

    -- Insert one more record in source stream and inspect the result
    INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d001', 'Marketing Team');
    
    ksql> select * from STM_DEPARTMENTS_TGT emit changes;
    +--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
    |ROWTIME                   |ROWKEY                    |HUMAN_TS                  |DEPT_NO                   |CURR_DEPT_NAME            |PREV_DEPT_NAME            |
    +--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
    |1587515912933             |d001                      |2020-04-21 20:38:32 -0400 |d001                      |Marketing                 |null                      |
    |1587515912955             |d002                      |2020-04-21 20:38:32 -0400 |d002                      |Claims                    |null                      |
    |1587515912969             |d003                      |2020-04-21 20:38:32 -0400 |d003                      |Accounting                |null                      |
    |1587515912980             |d004                      |2020-04-21 20:38:32 -0400 |d004                      |Information Technology    |null                      |
    |1587521602667             |d001                      |2020-04-21 22:13:22 -0400 |d001                      |Marketing Team            |Marketing                 |
    

As you see in previous step its creating a new record into stream, Now lets insert one more record into source stream and this time we won’t change the data. This should not impact our target stream.

-- Insert one more record in source stream and inspect the result
INSERT INTO STM_DEPARTMENTS_RAW (DEPT_NO, DEPT_NAME) VALUES ('d002', 'Claims');

SELECT * FROM STM_DEPARTMENTS_TGT EMIT CHANGES;
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|ROWTIME                   |ROWKEY                    |HUMAN_TS                  |DEPT_NO                   |CURR_DEPT_NAME            |PREV_DEPT_NAME            |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|1587515912933             |d001                      |2020-04-21 20:38:32 -0400 |d001                      |Marketing                 |null                      |
|1587515912955             |d002                      |2020-04-21 20:38:32 -0400 |d002                      |Claims                    |null                      |
|1587515912969             |d003                      |2020-04-21 20:38:32 -0400 |d003                      |Accounting                |null                      |
|1587515912980             |d004                      |2020-04-21 20:38:32 -0400 |d004                      |Information Technology    |null                      |
|1587521602667             |d001                      |2020-04-21 22:13:22 -0400 |d001                      |Marketing Team            |Marketing                 |

New insert did not trigger any changes to the target stream.

Share this blog:
Comments

Related Articles