Exploring ARRAY, MAP and LIST in ksqlDB

  • Home
  • /
  • Exploring ARRAY, MAP and LIST in ksqlDB
Exploring ARRAY, MAP and LIST in ksqlDB

Exploring ARRAY, MAP and LIST in ksqlDB

Confluent Kafka 22 Mar 2020

In this article we will go over few scenarios and to explore ARRAY, MAP and LIST in ksqlDB

Prerequisites
  • Install Confluent platform for your operating system using instructions at “https://docs.confluent.io/current/installation/installing_cp/index.html”
Scenario

Creating an array which gives the orders made by a user, Our input to generate this array is individual orders events.

Create a schema for your datagen, See “https://github.com/entechlog/ksqlDB-datagen/blob/master/ordersForArrayDemo.avro" for the schema I have created to generate test data for this demo.

cd into the directory which has the schema (in this case ordersForArrayDemo.avro) and issue below command. This will generate test data for our topic. In this example its random orders data for few customers
ksql-datagen schema=ordersForArrayDemo.avro format=avro topic='dev.etl.datagen.orders.src.0010' key=orderID maxInterval=5000 iterations=10

Just incase if you need to purge data in your test topic, Temporarily update the retention time on the topic to one second, then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value

kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name "dev.etl.datagen.orders.src.0010" --add-config retention.ms=1000

kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name "dev.etl.datagen.orders.src.0010" --add-config retention.ms=604800000

Now, Start KSQL session and Issue this command to start reading data from the beginning of the topic
SET 'auto.offset.reset'='earliest';

Check messages in topic using

ksql> PRINT 'dev.etl.datagen.orders.src.0010' FROM BEGINNING LIMIT 10;
Format:AVRO
3/22/20 10:40:58 PM EDT, 800000, {"orderID": "800000", "CustomerID": "6454214343400548", "StorePickUpFlag": "No", "ItemName": "Apple iPad", "OrderDate": "09/01/2018"}
3/22/20 10:41:00 PM EDT, 799999, {"orderID": "799999", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Apple TV 4K Streaming Console", "OrderDate": "09/01/2018"}
3/22/20 10:41:00 PM EDT, 799998, {"orderID": "799998", "CustomerID": "8484214343400844", "StorePickUpFlag": "Yes", "ItemName": "Nintendo Switch", "OrderDate": "01/28/2020"}
3/22/20 10:41:02 PM EDT, 799997, {"orderID": "799997", "CustomerID": "5759544343400808", "StorePickUpFlag": "Yes", "ItemName": "Arlo Pro", "OrderDate": "06/24/2019"}
3/22/20 10:41:05 PM EDT, 799996, {"orderID": "799996", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Tile Mate Item Finder 4-Pack Combo", "OrderDate": "01/28/2020"}
3/22/20 10:41:05 PM EDT, 799995, {"orderID": "799995", "CustomerID": "8484214343400844", "StorePickUpFlag": "Yes", "ItemName": "Apple iPad", "OrderDate": "01/28/2020"}
3/22/20 10:41:08 PM EDT, 799994, {"orderID": "799994", "CustomerID": "6454214343400548", "StorePickUpFlag": "Yes", "ItemName": "Nintendo Switch", "OrderDate": "09/01/2018"}
3/22/20 10:41:09 PM EDT, 799993, {"orderID": "799993", "CustomerID": "6454214343400548", "StorePickUpFlag": "No", "ItemName": "Jelly Comb folding Bluetooth keyboard", "OrderDate": "06/24/2019"}
3/22/20 10:41:10 PM EDT, 799992, {"orderID": "799992", "CustomerID": "5759544343400808", "StorePickUpFlag": "No", "ItemName": "Apple TV 4K Streaming Console", "OrderDate": "01/28/2020"}
3/22/20 10:41:12 PM EDT, 799991, {"orderID": "799991", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Jelly Comb folding Bluetooth keyboard", "OrderDate": "06/24/2019"}

Now lets create a STREAM on this topic using

CREATE STREAM STM_ARRAY_DEMO_0010 WITH (KAFKA_TOPIC='dev.etl.datagen.orders.src.0010', VALUE_FORMAT='Avro');

Validate the data in STREAM using below PUSH query.

ksql> SELECT * FROM STM_ARRAY_DEMO_0010 EMIT CHANGES LIMIT 3;
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|ROWTIME               |ROWKEY                |ORDERID               |CUSTOMERID            |STOREPICKUPFLAG       |ITEMNAME              |ORDERDATE             |
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|1584931258886         |800000                |800000                |6454214343400548      |No                    |Apple iPad            |09/01/2018            |
|1584931260502         |799999                |799999                |8484214343400844      |No                    |Apple TV 4K Streaming |09/01/2018            |
|                      |                      |                      |                      |                      |Console               |                      |
|1584931260691         |799998                |799998                |8484214343400844      |Yes                   |Nintendo Switch       |01/28/2020            |
Limit Reached
Query terminated
ksql> 

Create a STREAM to structure the data as map and re-partition the data by required key. Since we need order data listed by a customer we are partitioning by customer ID

CREATE STREAM STM_ARRAY_DEMO_0020 AS
SELECT CUSTOMERID, AS_MAP(AS_ARRAY('orderID','StorePickUpFlag','ItemName','OrderDate'), AS_ARRAY(ORDERID,STOREPICKUPFLAG,ITEMNAME,ORDERDATE)) as orderData 
FROM STM_ARRAY_DEMO_0010
PARTITION BY CUSTOMERID;

Validate the data in this new stream

ksql> SELECT * FROM STM_ARRAY_DEMO_0020 EMIT CHANGES LIMIT 2;
+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+
|ROWTIME                                 |ROWKEY                                  |CUSTOMERID                              |ORDERDATA                               |
+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+
|1584931258886                           |6454214343400548                        |6454214343400548                        |{orderID=800000, ItemName=Apple iPad, St|
|                                        |                                        |                                        |orePickUpFlag=No, OrderDate=09/01/2018} |
|1584931260502                           |8484214343400844                        |8484214343400844                        |{orderID=799999, ItemName=Apple TV 4K St|
|                                        |                                        |                                        |reaming Console, StorePickUpFlag=No, Ord|
|                                        |                                        |                                        |erDate=09/01/2018}                      |
Limit Reached
Query terminated
ksql>

Create a TABLE to structure the data as required LIST

CREATE TABLE TBL_ARRAY_DEMO_0010 AS
SELECT CUSTOMERID, COLLECT_LIST(CAST(ORDERDATA as VARCHAR)) FROM STM_ARRAY_DEMO_0020
GROUP BY CUSTOMERID
EMIT CHANGES;

Validate the data in final table using

ksql> SELECT CUSTOMERID, KSQL_COL_1 FROM TBL_ARRAY_DEMO_0010 EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|CUSTOMERID                                                                        |KSQL_COL_1                                                                        |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|6454214343400548                                                                  |[{orderID=800000, ItemName=Apple iPad, StorePickUpFlag=No, OrderDate=09/01/2018}, |
|                                                                                  |{orderID=799994, ItemName=Nintendo Switch, StorePickUpFlag=Yes, OrderDate=09/01/20|
|                                                                                  |18}, {orderID=799993, ItemName=Jelly Comb folding Bluetooth keyboard, StorePickUpF|
|                                                                                  |lag=No, OrderDate=06/24/2019}]                                                    |
|5759544343400808                                                                  |[{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}, {|
|                                                                                  |orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderD|
|                                                                                  |ate=01/28/2020}]                                                                  |
|8484214343400844                                                                  |[{orderID=799999, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, Orde|
|                                                                                  |rDate=09/01/2018}, {orderID=799998, ItemName=Nintendo Switch, StorePickUpFlag=Yes,|
|                                                                                  | OrderDate=01/28/2020}, {orderID=799996, ItemName=Tile Mate Item Finder 4-Pack Com|
|                                                                                  |bo, StorePickUpFlag=No, OrderDate=01/28/2020}, {orderID=799995, ItemName=Apple iPa|
|                                                                                  |d, StorePickUpFlag=Yes, OrderDate=01/28/2020}, {orderID=799991, ItemName=Jelly Com|
|                                                                                  |b folding Bluetooth keyboard, StorePickUpFlag=No, OrderDate=06/24/2019}]          |

As you see the data is coming in required format list all orders made by user as an array.

Some Known Issues Cannot compare ARRAY values
ksql version ‘5.4.1’
Confluent version ‘5.4.1’
Operating system ‘Ubuntu 18.04’

Attempted to compare two ARRAY fields and it failed with “Cannot compare ARRAY values”, Lets go over some steps to recreate this issue.

  1. Create a new table which will have same structure as the final table from above steps.
CREATE TABLE TBL_ARRAY_DEMO_0030 (CUSTOMERID VARCHAR(STRING), KSQL_COL_1 ARRAY < VARCHAR(STRING) >)
	WITH (KAFKA_TOPIC = 'TBL_ARRAY_DEMO_0030', VALUE_FORMAT = 'AVRO', KEY = 'CUSTOMERID', PARTITIONS = 1, REPLICAS = 1);
  1. Insert test record which we will use for compare.
INSERT INTO TBL_ARRAY_DEMO_0010 (CUSTOMERID, KSQL_COL_1)
VALUES ('6454214343400541', AS_ARRAY('{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}', '{orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderDate=01/28/2020}'));

INSERT INTO TBL_ARRAY_DEMO_0030 (CUSTOMERID, KSQL_COL_1)
VALUES ('6454214343400541', AS_ARRAY('{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}', '{orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderDate=01/28/2020}'));
  1. Run the ksql to compare the data in array
SELECT TIMESTAMPTOSTRING(CURR.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), CURR.KSQL_COL_1, PREV.KSQL_COL_1
FROM TBL_ARRAY_DEMO_0030 CURR
INNER JOIN TBL_ARRAY_DEMO_0010 PREV ON CURR.ROWKEY = PREV.ROWKEY
WHERE TRIM(UCASE(CURR.CUSTOMERID)) = '6454214343400541'
	AND CURR.KSQL_COL_1 <> PREV.KSQL_COL_1 EMIT CHANGES;

As you see the query fails with the error “Caused by: Cannot compare ARRAY values”

References
Share: