Exploring Kafka tombstone records in ksqlDB, MongoDB and Postgres

  • Home
  • /
  • Exploring Kafka tombstone records in ksqlDB, MongoDB and Postgres
Exploring Kafka tombstone records in ksqlDB, MongoDB and Postgres

Exploring Kafka tombstone records in ksqlDB, MongoDB and Postgres

Kafka 27 Jun 2020 Siva Nadesan
Table of Contents

Overview

In this article we will see how to generate a tombstone record in Kafka and to observe the behavior in ksqlDB, MongoDB and Postgres.

post thumb

Prerequisite

  • Download and install confluent platform for your platform. See here for instructions on how to install confluent platform.
  • Configure a Kafka producer. See How to generate test data for Kafka using Faker for instructions on how to setup a simple producer.
  • Install MongoDB Connector for Apache Kafka, See Here for instructions on how to install MongoDB Connector for Apache Kafka.
  • Install kafkacat, See Here for instructions on how to install kafkacat.
  • Working MongoDB and Postgres database instance.

Produce test records for Kafka topic

  • Run the Kafka producer and generate some test records.
docker run -it --rm  -e bootstrap_servers=192.168.1.9:39092 -e topic_name=kafka.ksql.tombstone.demo -e no_of_records=10 entechlog/faker-kafka-profile-datagen
post thumb
  • Start ksqlDB CLI session and examine the records in topic by running below command. Make sure set offset to earliest
set 'auto.offset.reset'='earliest';
ksql> print 'kafka.ksql.tombstone.demo';
Format:STRING
6/27/20 3:22:03 PM EDT , wcook , {"username": "wcook", "name": "Amber Gaines", "sex": "F", "address": "5876 Wendy Vista\x5CnGarciaborough, AZ 63886", "mail": "nancyallen@gmail.com", "birthdate": "1973-02-01"}
6/27/20 3:22:05 PM EDT , christopherwhite , {"username": "christopherwhite", "name": "James Bentley", "sex": "M", "address": "22491 Julie Junctions Apt. 075\x5CnNew Christine, NE 46532", "mail": "kevin33@gmail.com", "birthdate": "1985-03-27"}
6/27/20 3:22:05 PM EDT , leesteven , {"username": "leesteven", "name": "Donald Smith", "sex": "M", "address": "9882 Blanchard Street\x5CnNorth Matthewhaven, NV 87603", "mail": "lauracrawford@yahoo.com", "birthdate": "1929-07-16"}
6/27/20 3:22:05 PM EDT , pgrant , {"username": "pgrant", "name": "Joshua Gill", "sex": "M", "address": "128 Allen Pike\x5CnGarrisonport, VT 75201", "mail": "amanda38@yahoo.com", "birthdate": "1975-12-09"}
6/27/20 3:22:05 PM EDT , michelleblackwell , {"username": "michelleblackwell", "name": "Stacy Barrett", "sex": "F", "address": "162 Stewart Fall Apt. 636\x5CnSharimouth, DE 57409", "mail": "lindseyhurst@gmail.com", "birthdate": "2000-09-10"}
6/27/20 3:22:05 PM EDT , yblackburn , {"username": "yblackburn", "name": "Stacy Ayala", "sex": "F", "address": "PSC 5638, Box 6004\x5CnAPO AE 68274", "mail": "petersshawn@yahoo.com", "birthdate": "1935-04-14"}
6/27/20 3:22:05 PM EDT , jamesdominguez , {"username": "jamesdominguez", "name": "Charles Barton", "sex": "M", "address": "617 Anna Lights Suite 189\x5CnLucaston, ND 69540", "mail": "mcgeedenise@hotmail.com", "birthdate": "1992-07-27"}
6/27/20 3:22:05 PM EDT , wphillips , {"username": "wphillips", "name": "Daniel Jones", "sex": "M", "address": "580 Thompson Ports Suite 407\x5CnPort Katelyn, RI 03888", "mail": "milesmatthew@hotmail.com", "birthdate": "1918-07-16"}
6/27/20 3:22:05 PM EDT , hailey78 , {"username": "hailey78", "name": "Neil Malone", "sex": "M", "address": "00078 Jill Roads\x5CnNew Brianburgh, AZ 59199", "mail": "jakesantiago@gmail.com", "birthdate": "1980-07-25"}
6/27/20 3:22:05 PM EDT , sarah60 , {"username": "sarah60", "name": "Ryan Gonzalez", "sex": "M", "address": "11126 David Loop\x5CnLake Kimberlytown, MI 18851", "mail": "rowlandrobert@yahoo.com", "birthdate": "1922-08-05"}
  • Lets also make sure topic is compacted by running below command.
kafka-topics --zookeeper entechlog-vm-01:2181 --alter --topic kafka.ksql.tombstone.demo --config cleanup.policy=compact

WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic kafka.ksql.tombstone.demo.
  • Review the topic compaction policy by running below describe command.
kafka-topics --describe --zookeeper entechlog-vm-01:2181 --topic "kafka.ksql.tombstone.demo"

Topic: kafka.ksql.tombstone.demo        PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact
        Topic: kafka.ksql.tombstone.demo        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Create ksqlDB tables

  • Create RAW ksqlDB table, This table is backed by the topic we just created in previous step.
CREATE TABLE TBL_TOMBSTONE_DEMO_0010 (
	ROWKEY VARCHAR KEY
	,NAME VARCHAR
	,SEX VARCHAR
	,ADDRESS VARCHAR
	,MAIL VARCHAR
	,BIRTHDATE VARCHAR
	)
	WITH (
			KAFKA_TOPIC = 'kafka.ksql.tombstone.demo'
			,VALUE_FORMAT = 'JSON'
			,REPLICAS = 1
			,PARTITIONS = 1
			);
  • Create chained ksqlDB table on top of RAW table, This will do simple transformations on few fields.
CREATE TABLE TBL_TOMBSTONE_DEMO_0020 AS
SELECT ROWKEY AS USERNAME
	,UCASE(NAME) AS NAME
	,CASE
		WHEN SEX = 'M'
			THEN 'MALE'
		WHEN SEX = 'F'
			THEN 'FEMALE'
		ELSE 'OTHERS'
		END AS SEX
	,LCASE(MAIL) AS MAIL
	,BIRTHDATE
FROM TBL_TOMBSTONE_DEMO_0010 EMIT CHANGES;
  • Examine the records in ksqlDB table
ksql> SELECT * FROM TBL_TOMBSTONE_DEMO_0020 EMIT CHANGES;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME          |ROWKEY           |USERNAME         |NAME             |SEX              |MAIL             |BIRTHDATE        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1593285723143    |wcook            |wcook            |AMBER GAINES     |FEMALE           |nancyallen@gmail.|1973-02-01       |
|                 |                 |                 |                 |                 |com              |                 |
|1593285725048    |christopherwhite |christopherwhite |JAMES BENTLEY    |MALE             |kevin33@gmail.com|1985-03-27       |
|1593285725056    |leesteven        |leesteven        |DONALD SMITH     |MALE             |lauracrawford@yah|1929-07-16       |
|                 |                 |                 |                 |                 |oo.com           |                 |
|1593285725061    |pgrant           |pgrant           |JOSHUA GILL      |MALE             |amanda38@yahoo.co|1975-12-09       |
|                 |                 |                 |                 |                 |m                |                 |
|1593285725066    |michelleblackwell|michelleblackwell|STACY BARRETT    |FEMALE           |lindseyhurst@gmai|2000-09-10       |
|                 |                 |                 |                 |                 |l.com            |                 |
|1593285725069    |yblackburn       |yblackburn       |STACY AYALA      |FEMALE           |petersshawn@yahoo|1935-04-14       |
|                 |                 |                 |                 |                 |.com             |                 |
|1593285725075    |jamesdominguez   |jamesdominguez   |CHARLES BARTON   |MALE             |mcgeedenise@hotma|1992-07-27       |
|                 |                 |                 |                 |                 |il.com           |                 |
|1593285725080    |wphillips        |wphillips        |DANIEL JONES     |MALE             |milesmatthew@hotm|1918-07-16       |
|                 |                 |                 |                 |                 |ail.com          |                 |
|1593285725091    |hailey78         |hailey78         |NEIL MALONE      |MALE             |jakesantiago@gmai|1980-07-25       |
|                 |                 |                 |                 |                 |l.com            |                 |
|1593285725095    |sarah60          |sarah60          |RYAN GONZALEZ    |MALE             |rowlandrobert@yah|1922-08-05       |
|                 |                 |                 |                 |                 |oo.com           |                 |
  • Create chained ksqlDB table on top of RAW table, This will do simple transformations on few fields. This is same as previous table but we are also setting the value format as AVRO to make it easy for Postgress JDBC connection.
CREATE TABLE TBL_TOMBSTONE_DEMO_0030
	WITH (VALUE_FORMAT = 'AVRO') AS
SELECT ROWKEY AS USERNAME
	,UCASE(NAME) AS NAME
	,CASE
		WHEN SEX = 'M'
			THEN 'MALE'
		WHEN SEX = 'F'
			THEN 'FEMALE'
		ELSE 'OTHERS'
		END AS SEX
	,LCASE(MAIL) AS MAIL
	,BIRTHDATE
FROM TBL_TOMBSTONE_DEMO_0010 EMIT CHANGES;

Sink the tables to MongoDB and Postgres databases

  • Create MongoDB sink connector by running below command. Its important to note we are setting "delete.on.null.values": "true" which is going to handle the tombstone records.
curl --location --request PUT 'http://entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_MONGODB/config' \
--header 'Content-Type: application/json' \
--data-raw '{
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "transforms.id_to_object.field": "_id",
  "name": "TOMBSTONE_DEMO_SINK_MONGODB",
  "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "transforms": "id_to_object",
  "errors.retry.timeout": "3",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": "TBL_TOMBSTONE_DEMO_0020",
  "errors.deadletterqueue.topic.name": "TBL_TOMBSTONE_DEMO_0020_MONGO_DLQ",
  "errors.deadletterqueue.topic.replication.factor": "1",
  "errors.deadletterqueue.context.headers.enable": "true",
  "transforms.id_to_object.type": "org.apache.kafka.connect.transforms.HoistField$Key",
  "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
  "database": "demo",
  "collection": "TOMBSTONE_DEMO",
  "delete.on.null.values": "true",
  "key.projection.type": "none",
  "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy"
}'
  • Validate the status of connector by running below command.
curl --location --request GET 'entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_MONGODB/status'
{
    "name": "TOMBSTONE_DEMO_SINK_MONGODB",
    "connector": {
        "state": "RUNNING",
        "worker_id": "127.0.1.1:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "127.0.1.1:8083"
        }
    ],
    "type": "sink"
}
  • Examine the records in MongoDB.
post thumb
  • Create Postgres sink connector by running below command
    • It’s important to note we are setting "delete.enabled": true which helps to handle tombstone records.
    • demo schema in Postgres was created manually before creating the connector.
curl --location --request PUT 'http://entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_POSTGRESS/config' \
--header 'Content-Type: application/json' \
--data-raw '{
  "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
  "name": "TOMBSTONE_DEMO_SINK_POSTGRESS",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schemas.enable":"false",
  "topics": "TBL_TOMBSTONE_DEMO_0030",
  "connection.url": "jdbc:postgresql://entechlog-vm-01:5432/",
  "connection.user": "postgres",
  "connection.password": "postgres",
  "insert.mode": "upsert",
  "delete.enabled": true,
  "table.name.format": "demo.TOMBSTONE_DEMO",
  "pk.mode": "record_key",
  "pk.fields": "ROWKEY",
  "auto.create": "true"
}'
  • Validate the status of connector by running below command.
curl --location --request GET 'entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_POSTGRESS/status'
{
    "name": "TOMBSTONE_DEMO_SINK_POSTGRESS",
    "connector": {
        "state": "RUNNING",
        "worker_id": "127.0.1.1:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "127.0.1.1:8083"
        }
    ],
    "type": "sink"
}
  • Examine the records in Postgres.
post thumb

Create Tombstone records and review the results

At this point we have ksqlDB table, MongoDB collection and Postgress tables with required data. Now lets create a Tombstone record (record with key and no values) and see the impact in target objects.

  • Create a tombstone record by running below command. Here leesteven is the key for a record in topic.
echo "leesteven:" | kafkacat -P -Z -b entechlog-vm-01:9092 -t kafka.ksql.tombstone.demo -K:
  • Examine the records in ksqlDB, As you see here record for leesteven has been deleted from KsqlDB table now.
ksql> SELECT * FROM TBL_TOMBSTONE_DEMO_0020 EMIT CHANGES;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME          |ROWKEY           |USERNAME         |NAME             |SEX              |MAIL             |BIRTHDATE        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1593285723143    |wcook            |wcook            |AMBER GAINES     |FEMALE           |nancyallen@gmail.|1973-02-01       |
|                 |                 |                 |                 |                 |com              |                 |
|1593285725048    |christopherwhite |christopherwhite |JAMES BENTLEY    |MALE             |kevin33@gmail.com|1985-03-27       |
|1593285725061    |pgrant           |pgrant           |JOSHUA GILL      |MALE             |amanda38@yahoo.co|1975-12-09       |
|                 |                 |                 |                 |                 |m                |                 |
|1593285725066    |michelleblackwell|michelleblackwell|STACY BARRETT    |FEMALE           |lindseyhurst@gmai|2000-09-10       |
|                 |                 |                 |                 |                 |l.com            |                 |
|1593285725069    |yblackburn       |yblackburn       |STACY AYALA      |FEMALE           |petersshawn@yahoo|1935-04-14       |
|                 |                 |                 |                 |                 |.com             |                 |
|1593285725075    |jamesdominguez   |jamesdominguez   |CHARLES BARTON   |MALE             |mcgeedenise@hotma|1992-07-27       |
|                 |                 |                 |                 |                 |il.com           |                 |
|1593285725080    |wphillips        |wphillips        |DANIEL JONES     |MALE             |milesmatthew@hotm|1918-07-16       |
|                 |                 |                 |                 |                 |ail.com          |                 |
|1593285725091    |hailey78         |hailey78         |NEIL MALONE      |MALE             |jakesantiago@gmai|1980-07-25       |
|                 |                 |                 |                 |                 |l.com            |                 |
|1593285725095    |sarah60          |sarah60          |RYAN GONZALEZ    |MALE             |rowlandrobert@yah|1922-08-05       |
|                 |                 |                 |                 |                 |oo.com           |                 |
  • Examine the records in MongoDB, As you see here record for leesteven has been deleted from MongoDB collection now.
post thumb
  • Examine the records in Postgres, As you see here record for leesteven has been deleted from Postgres table now.
post thumb

Here we saw creating a tombstone record correctly in the parent topic will be also reflected in Kafka streams, ksqlDB and any target databases like MongoDB and Postgres.

References

Credits

Arvind Rajagopal



About The Authors
Siva Nadesan

Siva Nadesan is a Principal Data Engineer. His passion includes data and blogging about technologies. He is also the creator and maintainer of www.entechlog.com

LinkedIn

Share: