Exploring ksqlDB in Docker, headless ksqlDB and ksqlDB in Kubernetes
ksqlDB is built on top of Kafka Streams, a lightweight, powerful Java library for enriching, transforming, and …
In this article we will see how to build a simple weather alert application using Python, Airflow, Kafka, ksqlDB, Faust and Docker. Of course you can download your favorite weather alert application or even make a simple api call to OpenWeather to do what is done in this blog. Idea here is to explore these systems and to build a decoupled yet integrated demo. Code used in this article can be found here.
Download and install docker for your platform. Click here for instructions
Download and install Telegram for your platform
Create Telegram bot by placing new bot request to BotFather
/newbot
to BotFatherWe have the bot and access details now. See here for detailed instructions.
Test your bot by sending a test message. To send messages, you will need chat_id which can retrieved by sending /start
to https://telegram.me/userinfobot
curl -s -X POST https://api.telegram.org/bot<BOT-ACCESS-TOKEN>/sendMessage \
-d chat_id=<CHAT-ID>\
-d text="Its raining in home now"
Create Twilio account, phone number. See here for details.
Create Twilio API Keys from the Twilio Console. See here for details.
Test your Twilio account by sending a test message.
curl -X POST https://api.twilio.com/2010-04-01/Accounts/$TWILIO_ACCOUNT_SID/Messages.json \
--data-urlencode "From=<TWILIO_PHONE_NUMBER>" \
--data-urlencode "Body=Its raining in home now" \
--data-urlencode "To=<PHONE_NUMBER>" \
-u $TWILIO_ACCOUNT_SID:$TWILIO_AUTH_TOKEN
Download docker-compose.yml
and all required components from here
The docker-compose.yml
will bring up following services
/weather-alert-app/docker/app/
/weather-alert-app/docker/faust/
Here is the list of steps which should be done before bringing up the docker services
/weather-alert-app/kafka-connect/secrets/connect-secrets.properties.template
as /weather-alert-app/kafka-connect/secrets/connect-secrets.properties
and update it with the Telegram and Twilio details.TO DO : HTTP connector for Twilio is not working yet and need some research on POST operation with HTTP connector. Telegram alerts come from Kafka connector and Twilio alerts come from Faust streaming application.
Create a copy of .env.template
as .env
in /weather-alert-app/docker/faust
to update the Twilio details.
Edit weather-alert-dag.py
in /weather-alert-app/dags/
to update lat
, lon
and OPEN_WEATHER_API_KEY
in the DockerOperator
environment for the location which you want to track.
environment={
'bootstrap_servers': "broker:9092",
'schema_registry_url': "http://schema-registry:8081",
'topic_name': "weather.alert.app.source",
'lat': "",
'lon': "",
'OPEN_WEATHER_API_KEY': ""
}
TODO: Change to environment variable to avoid editing the dag manually
chmod -R 764 /weather-alert-app/kafka-connect/scripts
Now we have all required artifacts and the next step to start the Kafka services.
Start the application by running docker-compose up --remove-orphans -d --build
Validate the status of docker containers by running docker-compose ps
Name Command State Ports
--------------------------------------------------------------------------------------------------------------
airflow-postgres docker-entrypoint.sh postgres Up 5432/tcp
airflow-webserver /entrypoint.sh webserver Up 5555/tcp, 0.0.0.0:8080->8080/tcp, 8793/tcp
broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp
control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp
kafka-client bash -c -a echo Waiting fo ... Exit 1
kafka-connect bash -c echo "===> Install ... Up 0.0.0.0:8083->8083/tcp, 9092/tcp
kafkacat /bin/sh -c apk add jq; Up
wh ...
ksqldb-server /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
weather-alert-app bash -c echo "Launching we ... Up
weather-alert-app-faust faust -A app worker -l info Up
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Since we are starting ksqlDB in headless mode, there is no CLI or REST endpoints available. Logs for ksqlDB can be validated by running docker logs -f ksqldb-server
Now we can turn on the Airflow DAG by clicking ON button. This will trigger the pipeline to source data from OpenWeather.
Data can be generated manually for testing by SSHing into the docker and running below command
docker exec -it weather-alert-app /bin/bash
export OPEN_WEATHER_API_KEY=<your-open-weather-api-key>
python weather-alert-app.py --bootstrap_servers=broker:9092 --topic_name=weather.alert.app.source --schema_registry_url=http://schema-registry:8081 --lat=8.270272 --lon=77.177274
## Data is in avro format in source topic
docker exec -it kafkacat kafkacat -C -b broker:9092 -t weather.alert.app.source -s value=avro -r http://schema-registry:8081 -o beginning
docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_0020_SEL_ATTRIBUTES -o beginning -f 'Key: %k, Message: %s \n'
docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_0030_RAIN_IS_HERE -o beginning -f 'Key: %k, Message: %s \n'
docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_0040_UNIQUE_KEY -o beginning -f 'Key: %k, Message: %s \n'
docker exec -it kafkacat kafkacat -C -b broker:9092 -t TBL_WEATHER_ALERT_APP_0050_UNIQUE_KEY -o beginning -f 'Key: %k, Message: %s \n'
docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_9000_NOTIFY -o beginning -f 'Key: %k, Message: %s \n'
We have logic in ksql queries to emit only one event for a given day.
docker container ls -a
docker-compose down
docker-compose down --volumes
docker rm $(docker ps -q -f status=exited)
Here are some common errors and solution
Solution: Delete the old volume and bring up the services again
docker ps -a
docker inspect -f '{{ .Mounts }}' <container-id>
docker volume ls
docker volume rm <volume-name>
docker-compose down --volumes
docker-compose up --remove-orphans -d --build
docker network inspect {network}
docker network inspect weather-alert-app_default
docker network disconnect -f {network} {endpoint-name}
docker network disconnect -f weather-alert-app_default ffd95c0068e1e82b13f66fe578f93990b19c5fd757ed8201652b2f14a3d85377
docker ps -a
docker ps -qa
docker stop $(docker ps -q)
ksqlDB is built on top of Kafka Streams, a lightweight, powerful Java library for enriching, transforming, and …
In this article we will see how to integrate Kafka connect with Amazon Managed Streaming for Apache Kafka (MSK). Code …