preloader
blog-post

Script to drop ksqlDB objects

Table of Contents

If you have worked on ksqlDB, you would have come across this scenario multiple times. To drop a ksqlDB tables OR streams we need to identify all related queries, terminate them and there is no easy process to do it if you have multiple chained ksql processes. To ease this, I have created a script which will accept a list of table/stream names and will take care of the termination and drop activities.  

See GitHub for the source code for this script at ksqldb-utilities repo

Instructions

Let’s validate the script by creating some test streams and tables

  • Create a schema for your datagen, See https://github.com/entechlog/ksqlDB-datagen/blob/master/ordersForArrayDemo.avro for the schema I have created to generate test data for this demo.

  • cd into the directory which has the schema (in this case ordersForArrayDemo.avro) and issue below command. This will generate test data for our topic. In this example its random orders data for few customers

    ksql-datagen schema=ordersForArrayDemo.avro format=avro topic='dev.etl.datagen.orders.src.0010' key=orderID maxInterval=5000 iterations=10`  
    
  • Start KSQL and set the offset to read from beginning

    SET 'auto.offset.reset'='earliest';
    
  • Check messages in topic using

    PRINT 'dev.etl.datagen.orders.src.0010' FROM BEGINNING LIMIT 10;
    
  • Create the streams, tables for this demo

    CREATE STREAM STM_DEMO_DROP_KSQL_OBJECT_0010
    	WITH (KAFKA_TOPIC = 'dev.etl.datagen.orders.src.0010', VALUE_FORMAT = 'Avro');
    
    CREATE STREAM STM_DEMO_DROP_KSQL_OBJECT_0020 AS
    
    SELECT CUSTOMERID, AS_MAP(AS_ARRAY('orderID', 'StorePickUpFlag', 'ItemName', 'OrderDate'), AS_ARRAY(ORDERID, STOREPICKUPFLAG, ITEMNAME, ORDERDATE)) AS orderData
    FROM STM_DEMO_DROP_KSQL_OBJECT_0010 PARTITION BY CUSTOMERID;
    
    CREATE TABLE TBL_DEMO_DROP_KSQL_OBJECT_0030 AS
    
    SELECT CUSTOMERID, COLLECT_LIST(CAST(ORDERDATA AS VARCHAR))
    FROM STM_DEMO_DROP_KSQL_OBJECT_0020
    GROUP BY CUSTOMERID EMIT CHANGES;
    
  • Now let’s create the control file which has all the streams and tables which we need to drop. I have listed them in the same order as how I would have dropped them manually, That would be Last In, First Out (LIFO). Optionally you can also specify delete topic flag, yes and no are the valid options.

    TBL_DEMO_DROP_KSQL_OBJECT_0030,yes,no
    STM_DEMO_DROP_KSQL_OBJECT_0020,yes,no
    STM_DEMO_DROP_KSQL_OBJECT_0010,no,yes
    
  • Run the script to terminate, delete streams and tables

    ./dropKqlDBObjects.sh sed localhost:8088 /input/DEMO_DROP_KSQL_OBJECT.txt
    
  • Here is a sample output from the script

    **************************************************************************
    SCRIPT_NAME            : dropKqlDBObjects
    **************************************************************************
    INFO                   : logs directory exists
    INFO                   : temp directory exists
    INFO                   : reports directory exists
    INFO                   : Total arguments.. 3
    INFO                   : Received the correct number of input parameters
    INFO-MODE              : sed
    INFO-KSQL_URL          : localhost:8088
    INFO-FILE_NAME         : /opt/confluent/scripts/dropkqlobjects/input/DEMO_DROP_KSQL_OBJECT.txt
    INFO-LOGDIR            : /opt/confluent/scripts/dropkqlobjects/logs
    INFO-TEMPDIR           : /opt/confluent/scripts/dropkqlobjects/temp
    INFO-REPORTSDIR        : /opt/confluent/scripts/dropkqlobjects/reports
    INFO-INPUTDIR          : /opt/confluent/scripts/dropkqlobjects/input
    

Successful completion of this script will terminate all queries related to table/stream, drop the table/stream and also delete the topics based on delete topic flag in input file.

  • logs directory will have an entry for this run with more detailed log

  • reports directory will have a html file with summary of this execution

If you find this useful, clone the git repo and start using it. If you wish to add additional features, become a contributor and start enhancing.

Notes

  • jq option is not working yet, I will make changes in next few days to accommodate jq mode
  • Here are some known error and solutions when implementing this script

Known Errors and solutions

Error bash: ./dropKqlObjects.sh: /bin/sh^M: bad interpreter: No such file or directory

Solution sed -i -e 's/\r$//' dropKqlObjects.sh

Error ./dropKqlDBObjects.sh: Bad substitution

Solution Change the first line of code to #!/bin/sh OR #!/bin/bash based on your operating system

References

Share this blog:
Comments

Related Articles