
Install Kafka using Ansible, monitor using Prometheus and Grafana
In this article we will see how to install Confluent Kafka using Ansible and to monitor the metrics using Prometheus and …
ksqlDB is built on top of Kafka Streams, a lightweight, powerful Java library for enriching, transforming, and processing real-time streams of data. In this article we will see how to use ksqlDB in Docker and ksqlDB in Kubernetes. Code used in this article can be found at here
This section is based on confluent documentation in https://ksqldb.io/quickstart-platform.html#quickstart-content. Here we will spin up ksqlDB and ksqlDB CLI in docker which will connect to a Kafka cluster which is hosted elsewhere.
docker-compose.yml
file with below contentKSQL_BOOTSTRAP_SERVERS
with your Kafka bootstrap sever detailsksql.service.id
will be problematic.---
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: 192.168.1.12:39092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
cd into the directory with docker-compose.yml
and bring up ksqlDB by the command
docker-compose up
Now you can start ksqlDB’s CLI and work on it using the command
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
docker container ls -a
docker-compose down
docker start -a container-name
docker rm $(docker ps -q -f status=exited)
After developing queries in interactive mode the next step would be to deploy the ksqlDB queries in headless mode. In the headless mode, you write all of your queries in a SQL file, and then start ksqlDB server instances with this file as an argument. You can read mode about headless mode here
Create the ksqlDB queries for your application and save it in a file say queries.sql
. In this example the relative path for this sql file is /app/queries/queries.sql
-- Set offset to earliest
SET 'auto.offset.reset'='earliest';
-- Create RAW table
CREATE TABLE TBL_HEADLESS_DEMO_0010 (
ROWKEY VARCHAR PRIMARY KEY
,NAME VARCHAR
,SEX VARCHAR
,ADDRESS VARCHAR
,MAIL VARCHAR
,BIRTHDATE VARCHAR
)
WITH (
KAFKA_TOPIC = 'kafka.ksql.headless.demo'
,VALUE_FORMAT = 'JSON'
,REPLICAS = 1
,PARTITIONS = 1
);
-- Apply transformation
CREATE TABLE TBL_HEADLESS_DEMO_0020 AS
SELECT ROWKEY AS USERNAME
,UCASE(NAME) AS NAME
,CASE
WHEN SEX = 'M'
THEN 'MALE'
WHEN SEX = 'F'
THEN 'FEMALE'
ELSE 'OTHERS'
END AS SEX
,LCASE(MAIL) AS MAIL
,BIRTHDATE
FROM TBL_HEADLESS_DEMO_0010 EMIT CHANGES;
Create the docker-compose.yml
file. Here KSQL_BOOTSTRAP_SERVERS
and KSQL_KSQL_SERVICE_ID
are parameters which its value from the .env
file.
---
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:latest
volumes:
- ./app/queries/queries.sql:/opt/queries/queries.sql
environment:
KSQL_BOOTSTRAP_SERVERS: ${KSQL_BOOTSTRAP_SERVERS}
KSQL_KSQL_SERVICE_ID: ${KSQL_KSQL_SERVICE_ID}
KSQL_KSQL_QUERIES_FILE: /opt/queries/queries.sql
restart: unless-stopped
Create the .env
file with details about your parameter. In this example the .env
file will look like
KSQL_BOOTSTRAP_SERVERS=192.168.1.8:39092
KSQL_KSQL_SERVICE_ID=demo_02_
Start the ksqlDB in headless mode by running docker-compose up
. You could also do docker-compose up -d
if you would like to run in detached mode (No displays on screen)
Generate some test data by running our faker module
docker run -it --rm -e bootstrap_servers=192.168.1.8:39092 -e topic_name=kafka.ksql.headless.demo -e no_of_records=10 entechlog/faker-kafka-profile-datagen
Review the result in the target topic by using a kafka consumer OR control center
At this point we have a working docker-compose.yml
from our headless mode example. We will use that to create Kubernetes manifest files.
Install and configure kompose. kompose is a tool to help users who are familiar with docker-compose move to Kubernetes. You can read more about kompose here
curl -L https://github.com/kubernetes/kompose/releases/download/v1.21.0/kompose-linux-amd64 -o kompose
chmod +x kompose
sudo mv ./kompose /usr/local/bin/kompose
cd into the directory which has the required docker-compose.yml
and run below command
kompose convert
kompose convert
will generate two Kubernetes manifest files ksqldb-server-claim0-persistentvolumeclaim.yaml
(Renamed to ksqldb-server-persistentvolume-claim.yml
for this demo) and ksqldb-server-deployment.yaml
. We will also create a third file named ksqldb-server-deployment.yml
and edit ksqldb-server-claim0-persistentvolumeclaim.yaml
. Here is how the final files will look like.
apiVersion: v1
kind: PersistentVolume
metadata:
name: ksqldb-server-pv
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
capacity:
storage: 1Gi
hostPath:
path: /data
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: ksqldb-server-claim
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: ksqldb-server
name: ksqldb-server
spec:
replicas: 5
selector:
matchLabels:
io.kompose.service: ksqldb-server
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.service: ksqldb-server
spec:
containers:
- env:
- name: KSQL_BOOTSTRAP_SERVERS
value: 192.168.1.8:39092
- name: KSQL_KSQL_QUERIES_FILE
value: /data/opt/queries/queries.sql
- name: KSQL_KSQL_SERVICE_ID
value: demo_03_
image: confluentinc/ksqldb-server:latest
imagePullPolicy: ""
name: ksqldb-server
resources: {}
volumeMounts:
- mountPath: /data/
name: ksqldb-server-claim
restartPolicy: Always
serviceAccountName: ""
volumes:
- name: ksqldb-server-claim
persistentVolumeClaim:
claimName: ksqldb-server-claim
status: {}
Create the folder for volume in host and make sure to copy the required file like /data/opt/queries/queries.sql
Create the PV, PVC and Pods by running
kubectl apply -f ksqldb-server-persistentvolume.yaml
kubectl apply -f ksqldb-server-persistentvolume-claim.yaml
kubectl apply -f ksqldb-server-deployment.yaml
Verify the status of PV, PVC and Pods
kubectl get pv; kubectl get pvc; kubectl get pods;
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
ksqldb-server-pv 1Gi RWO Retain Bound default/ksqldb-server-claim manual 153m
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
ksqldb-server-claim Bound ksqldb-server-pv 1Gi RWO manual 150m
NAME READY STATUS RESTARTS AGE
ksqldb-server-774c794fcb-cg9gq 1/1 Running 0 4m12s
ksqldb-server-774c794fcb-gzg97 1/1 Running 0 4m12s
ksqldb-server-774c794fcb-j8922 1/1 Running 0 4m12s
ksqldb-server-774c794fcb-qcw5g 1/1 Running 0 4m12s
ksqldb-server-774c794fcb-t28j2 1/1 Running 0 4m12s
kubectl get events
kubectl logs pod-name
kubectl exec --stdin --tty pod-name -- /bin/bash
Review the result in the target topic by using a Kafka consumer OR control center
Running ksql in k8 will give us the option to run multiple instances of the query when we have a huge data volume with multiple partitions and corresponding numbers of ksql consumers are needed.
In this article we will see how to install Confluent Kafka using Ansible and to monitor the metrics using Prometheus and …
In this article we will see how to generate a tombstone record in Kafka and to observe the behavior in ksqlDB, MongoDB …