preloader
blog-post

Exploring ksqlDB in Docker, headless ksqlDB and ksqlDB in Kubernetes

Table of Contents

ksqlDB is built on top of Kafka Streams, a lightweight, powerful Java library for enriching, transforming, and processing real-time streams of data. In this article we will see how to use ksqlDB in Docker and ksqlDB in Kubernetes. Code used in this article can be found at here

Prerequisite

  • Download and install the confluent platform for your operating system. See here for instructions on how to install the confluent platform
  • Configure a Kafka producer. See How to generate test data for Kafka using Faker for instructions on how to setup a simple producer

ksqlDB in Docker

This section is based on confluent documentation in https://ksqldb.io/quickstart-platform.html#quickstart-content. Here we will spin up ksqlDB and ksqlDB CLI in docker which will connect to a Kafka cluster which is hosted elsewhere.

  • Create the docker-compose.yml file with below content

  • Make sure to update KSQL_BOOTSTRAP_SERVERS with your Kafka bootstrap sever details
  • Use the correct version of ksqldb-server image required for your use case. On certain scenarios using different versions with same ksql.service.id will be problematic.

---
version: '2'

services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:latest
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: 192.168.1.12:39092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
  • cd into the directory with docker-compose.yml and bring up ksqlDB by the command

    docker-compose up
    
  • Now you can start ksqlDB’s CLI and work on it using the command

    docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    

  • You can see the list of all containers by running docker container ls -a
  • You can bring down the containers by running docker-compose down
  • You can start the container again by running docker start -a container-name
  • You can delete all exited containers by running docker rm $(docker ps -q -f status=exited)

ksqlDB in Headless mode

After developing queries in interactive mode the next step would be to deploy the ksqlDB queries in headless mode. In the headless mode, you write all of your queries in a SQL file, and then start ksqlDB server instances with this file as an argument. You can read mode about headless mode here

  • Create the ksqlDB queries for your application and save it in a file say queries.sql. In this example the relative path for this sql file is /app/queries/queries.sql

    -- Set offset to earliest
    SET 'auto.offset.reset'='earliest';
    
    -- Create RAW table
    CREATE TABLE TBL_HEADLESS_DEMO_0010 (
    	ROWKEY VARCHAR PRIMARY KEY
    	,NAME VARCHAR
    	,SEX VARCHAR
    	,ADDRESS VARCHAR
    	,MAIL VARCHAR
    	,BIRTHDATE VARCHAR
    	)
    	WITH (
    			KAFKA_TOPIC = 'kafka.ksql.headless.demo'
    			,VALUE_FORMAT = 'JSON'
    			,REPLICAS = 1
    			,PARTITIONS = 1
    			);
    
    -- Apply transformation
    CREATE TABLE TBL_HEADLESS_DEMO_0020 AS
    SELECT ROWKEY AS USERNAME
    	,UCASE(NAME) AS NAME
    	,CASE
    		WHEN SEX = 'M'
    			THEN 'MALE'
    		WHEN SEX = 'F'
    			THEN 'FEMALE'
    		ELSE 'OTHERS'
    		END AS SEX
    	,LCASE(MAIL) AS MAIL
    	,BIRTHDATE
    FROM TBL_HEADLESS_DEMO_0010 EMIT CHANGES;
    
  • Create the docker-compose.yml file. Here KSQL_BOOTSTRAP_SERVERS and KSQL_KSQL_SERVICE_ID are parameters which its value from the .env file.

    ---
    version: '2'
    
    services:
      ksqldb-server:
        image: confluentinc/ksqldb-server:latest
        volumes:
          - ./app/queries/queries.sql:/opt/queries/queries.sql
        environment:
          KSQL_BOOTSTRAP_SERVERS: ${KSQL_BOOTSTRAP_SERVERS}
          KSQL_KSQL_SERVICE_ID: ${KSQL_KSQL_SERVICE_ID}
          KSQL_KSQL_QUERIES_FILE: /opt/queries/queries.sql
        restart: unless-stopped
    
  • Create the .env file with details about your parameter. In this example the .env file will look like

    KSQL_BOOTSTRAP_SERVERS=192.168.1.8:39092
    KSQL_KSQL_SERVICE_ID=demo_02_
    
  • Start the ksqlDB in headless mode by running docker-compose up. You could also do docker-compose up -d if you would like to run in detached mode (No displays on screen)

  • Generate some test data by running our faker module

    docker run -it --rm  -e bootstrap_servers=192.168.1.8:39092 -e topic_name=kafka.ksql.headless.demo -e no_of_records=10 entechlog/faker-kafka-profile-datagen
    
  • Review the result in the target topic by using a kafka consumer OR control center

ksqlDB in Kubernetes

At this point we have a working docker-compose.yml from our headless mode example. We will use that to create Kubernetes manifest files.

  • Install and configure kompose. kompose is a tool to help users who are familiar with docker-compose move to Kubernetes. You can read more about kompose here

    curl -L https://github.com/kubernetes/kompose/releases/download/v1.21.0/kompose-linux-amd64 -o kompose
    
    chmod +x kompose
    sudo mv ./kompose /usr/local/bin/kompose
    
  • cd into the directory which has the required docker-compose.yml and run below command

    kompose convert
    
  • kompose convert will generate two Kubernetes manifest files ksqldb-server-claim0-persistentvolumeclaim.yaml(Renamed to ksqldb-server-persistentvolume-claim.yml for this demo) and ksqldb-server-deployment.yaml. We will also create a third file named ksqldb-server-deployment.yml and edit ksqldb-server-claim0-persistentvolumeclaim.yaml. Here is how the final files will look like.

apiVersion: v1
kind: PersistentVolume
metadata:
name: ksqldb-server-pv
spec:
storageClassName: manual
accessModes:
	- ReadWriteOnce
capacity:
	storage: 1Gi
hostPath:
	path: /data
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: ksqldb-server-claim
spec:
  storageClassName: manual
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    io.kompose.service: ksqldb-server
  name: ksqldb-server
spec:
  replicas: 5
  selector:
    matchLabels:
      io.kompose.service: ksqldb-server
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        io.kompose.service: ksqldb-server
    spec:
      containers:
      - env:
        - name: KSQL_BOOTSTRAP_SERVERS
          value: 192.168.1.8:39092
        - name: KSQL_KSQL_QUERIES_FILE
          value: /data/opt/queries/queries.sql
        - name: KSQL_KSQL_SERVICE_ID
          value: demo_03_
        image: confluentinc/ksqldb-server:latest
        imagePullPolicy: ""
        name: ksqldb-server
        resources: {}
        volumeMounts:
        - mountPath: /data/
          name: ksqldb-server-claim
      restartPolicy: Always
      serviceAccountName: ""
      volumes:
      - name: ksqldb-server-claim
        persistentVolumeClaim:
          claimName: ksqldb-server-claim
status: {}
  • Create the folder for volume in host and make sure to copy the required file like /data/opt/queries/queries.sql

  • Create the PV, PVC and Pods by running

    kubectl apply -f ksqldb-server-persistentvolume.yaml
    kubectl apply -f ksqldb-server-persistentvolume-claim.yaml
    kubectl apply -f ksqldb-server-deployment.yaml
    
  • Verify the status of PV, PVC and Pods

    kubectl get pv; kubectl get pvc; kubectl get pods;
    
    NAME               CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                         STORAGECLASS   REASON   AGE
    ksqldb-server-pv   1Gi        RWO            Retain           Bound    default/ksqldb-server-claim   manual                  153m
    NAME                  STATUS   VOLUME             CAPACITY   ACCESS MODES   STORAGECLASS   AGE
    ksqldb-server-claim   Bound    ksqldb-server-pv   1Gi        RWO            manual         150m
    NAME                             READY   STATUS    RESTARTS   AGE
    ksqldb-server-774c794fcb-cg9gq   1/1     Running   0          4m12s
    ksqldb-server-774c794fcb-gzg97   1/1     Running   0          4m12s
    ksqldb-server-774c794fcb-j8922   1/1     Running   0          4m12s
    ksqldb-server-774c794fcb-qcw5g   1/1     Running   0          4m12s
    ksqldb-server-774c794fcb-t28j2   1/1     Running   0          4m12s
    

  • You can see the history of events by running kubectl get events
  • You can see the logs of a pod by running kubectl logs pod-name
  • You can ssh into the pod by running kubectl exec --stdin --tty pod-name -- /bin/bash

  • Review the result in the target topic by using a Kafka consumer OR control center

Running ksql in k8 will give us the option to run multiple instances of the query when we have a huge data volume with multiple partitions and corresponding numbers of ksql consumers are needed.

References

Share this blog:
Comments

Related Articles