preloader
blog-post

Exploring Kafka tombstone records in ksqlDB, MongoDB and Postgres

Table of Contents

In this article we will see how to generate a tombstone record in Kafka and to observe the behavior in ksqlDB, MongoDB and Postgres. To delete records which are no longer needed Kafka uses tombstone records. These are messages with a valid key and null value.

Prerequisite

  • Download and install confluent platform for your platform. See here for instructions on how to install confluent platform
  • Configure a Kafka producer. See How to generate test data for Kafka using Faker for instructions on how to setup a simple producer
  • Install MongoDB Connector for Apache Kafka, See Here for instructions on how to install MongoDB Connector for Apache Kafka
  • Install kafkacat, See Here for instructions on how to install kafkacat.
  • Working MongoDB and Postgres database instance

Produce test records for Kafka topic

  • Run the Kafka producer and generate some test records

    docker run -it --rm  -e bootstrap_servers=192.168.1.9:39092 -e topic_name=kafka.ksql.tombstone.demo -e no_of_records=10 entechlog/faker-kafka-profile-datagen
    
  • Start ksqlDB CLI session and examine the records in topic by running below command. Make sure set offset to earliest

    set 'auto.offset.reset'='earliest';
    
    ksql> print 'kafka.ksql.tombstone.demo';
    Format:STRING
    6/27/20 3:22:03 PM EDT , wcook , {"username": "wcook", "name": "Amber Gaines", "sex": "F", "address": "5876 Wendy Vista\x5CnGarciaborough, AZ 63886", "mail": "nancyallen@gmail.com", "birthdate": "1973-02-01"}
    6/27/20 3:22:05 PM EDT , christopherwhite , {"username": "christopherwhite", "name": "James Bentley", "sex": "M", "address": "22491 Julie Junctions Apt. 075\x5CnNew Christine, NE 46532", "mail": "kevin33@gmail.com", "birthdate": "1985-03-27"}
    6/27/20 3:22:05 PM EDT , leesteven , {"username": "leesteven", "name": "Donald Smith", "sex": "M", "address": "9882 Blanchard Street\x5CnNorth Matthewhaven, NV 87603", "mail": "lauracrawford@yahoo.com", "birthdate": "1929-07-16"}
    6/27/20 3:22:05 PM EDT , pgrant , {"username": "pgrant", "name": "Joshua Gill", "sex": "M", "address": "128 Allen Pike\x5CnGarrisonport, VT 75201", "mail": "amanda38@yahoo.com", "birthdate": "1975-12-09"}
    6/27/20 3:22:05 PM EDT , michelleblackwell , {"username": "michelleblackwell", "name": "Stacy Barrett", "sex": "F", "address": "162 Stewart Fall Apt. 636\x5CnSharimouth, DE 57409", "mail": "lindseyhurst@gmail.com", "birthdate": "2000-09-10"}
    6/27/20 3:22:05 PM EDT , yblackburn , {"username": "yblackburn", "name": "Stacy Ayala", "sex": "F", "address": "PSC 5638, Box 6004\x5CnAPO AE 68274", "mail": "petersshawn@yahoo.com", "birthdate": "1935-04-14"}
    6/27/20 3:22:05 PM EDT , jamesdominguez , {"username": "jamesdominguez", "name": "Charles Barton", "sex": "M", "address": "617 Anna Lights Suite 189\x5CnLucaston, ND 69540", "mail": "mcgeedenise@hotmail.com", "birthdate": "1992-07-27"}
    6/27/20 3:22:05 PM EDT , wphillips , {"username": "wphillips", "name": "Daniel Jones", "sex": "M", "address": "580 Thompson Ports Suite 407\x5CnPort Katelyn, RI 03888", "mail": "milesmatthew@hotmail.com", "birthdate": "1918-07-16"}
    6/27/20 3:22:05 PM EDT , hailey78 , {"username": "hailey78", "name": "Neil Malone", "sex": "M", "address": "00078 Jill Roads\x5CnNew Brianburgh, AZ 59199", "mail": "jakesantiago@gmail.com", "birthdate": "1980-07-25"}
    6/27/20 3:22:05 PM EDT , sarah60 , {"username": "sarah60", "name": "Ryan Gonzalez", "sex": "M", "address": "11126 David Loop\x5CnLake Kimberlytown, MI 18851", "mail": "rowlandrobert@yahoo.com", "birthdate": "1922-08-05"}
    
  • Lets also make sure topic is compacted by running below command

    kafka-topics --zookeeper entechlog-vm-01:2181 --alter --topic kafka.ksql.tombstone.demo --config cleanup.policy=compact
    
    WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
             Going forward, please use kafka-configs.sh for this functionality
    Updated config for topic kafka.ksql.tombstone.demo.
    
  • Review the topic compaction policy by running below describe command

    kafka-topics --describe --zookeeper entechlog-vm-01:2181 --topic "kafka.ksql.tombstone.demo"
    
    Topic: kafka.ksql.tombstone.demo        PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact
    Topic: kafka.ksql.tombstone.demo        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
    

Create ksqlDB tables

  • Create RAW ksqlDB table, This table is backed by the topic we just created in previous step

    CREATE TABLE TBL_TOMBSTONE_DEMO_0010 (
    	ROWKEY VARCHAR KEY
    	,NAME VARCHAR
    	,SEX VARCHAR
    	,ADDRESS VARCHAR
    	,MAIL VARCHAR
    	,BIRTHDATE VARCHAR
    	)
    	WITH (
    			KAFKA_TOPIC = 'kafka.ksql.tombstone.demo'
    			,VALUE_FORMAT = 'JSON'
    			,REPLICAS = 1
    			,PARTITIONS = 1
    			);
    
  • Create chained ksqlDB table on top of RAW table, This will do simple transformations on few fields

    CREATE TABLE TBL_TOMBSTONE_DEMO_0020 AS
    SELECT ROWKEY AS USERNAME
    	,UCASE(NAME) AS NAME
    	,CASE
    		WHEN SEX = 'M'
    			THEN 'MALE'
    		WHEN SEX = 'F'
    			THEN 'FEMALE'
    		ELSE 'OTHERS'
    		END AS SEX
    	,LCASE(MAIL) AS MAIL
    	,BIRTHDATE
    FROM TBL_TOMBSTONE_DEMO_0010 EMIT CHANGES;
    
  • Examine the records in ksqlDB table

    ksql> SELECT * FROM TBL_TOMBSTONE_DEMO_0020 EMIT CHANGES;
    +-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |ROWTIME          |ROWKEY           |USERNAME         |NAME             |SEX              |MAIL             |BIRTHDATE        |
    +-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |1593285723143    |wcook            |wcook            |AMBER GAINES     |FEMALE           |nancyallen@gmail.|1973-02-01       |
    |                 |                 |                 |                 |                 |com              |                 |
    |1593285725048    |christopherwhite |christopherwhite |JAMES BENTLEY    |MALE             |kevin33@gmail.com|1985-03-27       |
    |1593285725056    |leesteven        |leesteven        |DONALD SMITH     |MALE             |lauracrawford@yah|1929-07-16       |
    |                 |                 |                 |                 |                 |oo.com           |                 |
    |1593285725061    |pgrant           |pgrant           |JOSHUA GILL      |MALE             |amanda38@yahoo.co|1975-12-09       |
    |                 |                 |                 |                 |                 |m                |                 |
    |1593285725066    |michelleblackwell|michelleblackwell|STACY BARRETT    |FEMALE           |lindseyhurst@gmai|2000-09-10       |
    |                 |                 |                 |                 |                 |l.com            |                 |
    |1593285725069    |yblackburn       |yblackburn       |STACY AYALA      |FEMALE           |petersshawn@yahoo|1935-04-14       |
    |                 |                 |                 |                 |                 |.com             |                 |
    |1593285725075    |jamesdominguez   |jamesdominguez   |CHARLES BARTON   |MALE             |mcgeedenise@hotma|1992-07-27       |
    |                 |                 |                 |                 |                 |il.com           |                 |
    |1593285725080    |wphillips        |wphillips        |DANIEL JONES     |MALE             |milesmatthew@hotm|1918-07-16       |
    |                 |                 |                 |                 |                 |ail.com          |                 |
    |1593285725091    |hailey78         |hailey78         |NEIL MALONE      |MALE             |jakesantiago@gmai|1980-07-25       |
    |                 |                 |                 |                 |                 |l.com            |                 |
    |1593285725095    |sarah60          |sarah60          |RYAN GONZALEZ    |MALE             |rowlandrobert@yah|1922-08-05       |
    |                 |                 |                 |                 |                 |oo.com           |                 |
    
  • Create chained ksqlDB table on top of RAW table, This will do simple transformations on few fields. This is same as previous table but we are also setting the value format as AVRO to make it easy for Postgres JDBC connection.

    CREATE TABLE TBL_TOMBSTONE_DEMO_0030
    	WITH (VALUE_FORMAT = 'AVRO') AS
    SELECT ROWKEY AS USERNAME
    	,UCASE(NAME) AS NAME
    	,CASE
    		WHEN SEX = 'M'
    			THEN 'MALE'
    		WHEN SEX = 'F'
    			THEN 'FEMALE'
    		ELSE 'OTHERS'
    		END AS SEX
    	,LCASE(MAIL) AS MAIL
    	,BIRTHDATE
    FROM TBL_TOMBSTONE_DEMO_0010 EMIT CHANGES;
    

Sink the tables to MongoDB and Postgres databases

  • Create MongoDB sink connector by running below command. Its important to note we are setting "delete.on.null.values": "true" which is going to handle the tombstone records.

    curl --location --request PUT 'http://entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_MONGODB/config' \
    --header 'Content-Type: application/json' \
    --data-raw '{
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false",
      "transforms.id_to_object.field": "_id",
      "name": "TOMBSTONE_DEMO_SINK_MONGODB",
      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "transforms": "id_to_object",
      "errors.retry.timeout": "3",
      "errors.log.enable": "true",
      "errors.log.include.messages": "true",
      "topics": "TBL_TOMBSTONE_DEMO_0020",
      "errors.deadletterqueue.topic.name": "TBL_TOMBSTONE_DEMO_0020_MONGO_DLQ",
      "errors.deadletterqueue.topic.replication.factor": "1",
      "errors.deadletterqueue.context.headers.enable": "true",
      "transforms.id_to_object.type": "org.apache.kafka.connect.transforms.HoistField$Key",
      "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
      "database": "demo",
      "collection": "TOMBSTONE_DEMO",
      "delete.on.null.values": "true",
      "key.projection.type": "none",
      "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy"
    }'
    
  • Validate the status of connector by running below command

    curl --location --request GET 'entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_MONGODB/status'
    
    {
        "name": "TOMBSTONE_DEMO_SINK_MONGODB",
        "connector": {
            "state": "RUNNING",
            "worker_id": "127.0.1.1:8083"
        },
        "tasks": [
            {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "127.0.1.1:8083"
            }
        ],
        "type": "sink"
    }
    
  • Examine the records in MongoDB

  • Create Postgres sink connector by running below command

    • It’s important to note we are setting "delete.enabled": true which helps to handle tombstone records
    • demo schema in Postgres was created manually before creating the connector
    curl --location --request PUT 'http://entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_POSTGRES/config' \
    --header 'Content-Type: application/json' \
    --data-raw '{
      "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "name": "TOMBSTONE_DEMO_SINK_POSTGRES",
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schemas.enable":"false",
      "topics": "TBL_TOMBSTONE_DEMO_0030",
      "connection.url": "jdbc:postgresql://entechlog-vm-01:5432/",
      "connection.user": "postgres",
      "connection.password": "postgres",
      "insert.mode": "upsert",
      "delete.enabled": true,
      "table.name.format": "demo.TOMBSTONE_DEMO",
      "pk.mode": "record_key",
      "pk.fields": "ROWKEY",
      "auto.create": "true"
    }'
    
  • Validate the status of connector by running below command

    curl --location --request GET 'entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_POSTGRES/status'
    
    {
        "name": "TOMBSTONE_DEMO_SINK_POSTGRES",
        "connector": {
            "state": "RUNNING",
            "worker_id": "127.0.1.1:8083"
        },
        "tasks": [
            {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "127.0.1.1:8083"
            }
        ],
        "type": "sink"
    }
    
  • Examine the records in Postgres

Create Tombstone records and review the results

At this point we have ksqlDB table, MongoDB collection and Postgres tables with required data. Now lets create a Tombstone record (record with key and no values) and see the impact in target objects.

  • Create a tombstone record by running below command. Here leesteven is the key for a record in topic

    echo "leesteven:" | kafkacat -P -Z -b entechlog-vm-01:9092 -t kafka.ksql.tombstone.demo -K:
    
  • Examine the records in ksqlDB, As you see here record for leesteven has been deleted from KsqlDB table now

    ksql> SELECT * FROM TBL_TOMBSTONE_DEMO_0020 EMIT CHANGES;
    +-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |ROWTIME          |ROWKEY           |USERNAME         |NAME             |SEX              |MAIL             |BIRTHDATE        |
    +-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |1593285723143    |wcook            |wcook            |AMBER GAINES     |FEMALE           |nancyallen@gmail.|1973-02-01       |
    |                 |                 |                 |                 |                 |com              |                 |
    |1593285725048    |christopherwhite |christopherwhite |JAMES BENTLEY    |MALE             |kevin33@gmail.com|1985-03-27       |
    |1593285725061    |pgrant           |pgrant           |JOSHUA GILL      |MALE             |amanda38@yahoo.co|1975-12-09       |
    |                 |                 |                 |                 |                 |m                |                 |
    |1593285725066    |michelleblackwell|michelleblackwell|STACY BARRETT    |FEMALE           |lindseyhurst@gmai|2000-09-10       |
    |                 |                 |                 |                 |                 |l.com            |                 |
    |1593285725069    |yblackburn       |yblackburn       |STACY AYALA      |FEMALE           |petersshawn@yahoo|1935-04-14       |
    |                 |                 |                 |                 |                 |.com             |                 |
    |1593285725075    |jamesdominguez   |jamesdominguez   |CHARLES BARTON   |MALE             |mcgeedenise@hotma|1992-07-27       |
    |                 |                 |                 |                 |                 |il.com           |                 |
    |1593285725080    |wphillips        |wphillips        |DANIEL JONES     |MALE             |milesmatthew@hotm|1918-07-16       |
    |                 |                 |                 |                 |                 |ail.com          |                 |
    |1593285725091    |hailey78         |hailey78         |NEIL MALONE      |MALE             |jakesantiago@gmai|1980-07-25       |
    |                 |                 |                 |                 |                 |l.com            |                 |
    |1593285725095    |sarah60          |sarah60          |RYAN GONZALEZ    |MALE             |rowlandrobert@yah|1922-08-05       |
    |                 |                 |                 |                 |                 |oo.com           |                 |
    
  • Examine the records in MongoDB, As you see here record for leesteven has been deleted from MongoDB collection now

  • Examine the records in Postgres, As you see here record for leesteven has been deleted from Postgres table now

Here we saw creating a tombstone record correctly in the parent topic will be also reflected in Kafka streams, ksqlDB and any target databases like MongoDB and Postgres.

References

Credits

Arvind Rajagopal

Share this blog:
Comments

Related Articles