preloader
blog-post

Building Weather Alert Application using Python, Airflow, Kafka, ksqlDB, Faust and Docker

Table of Contents

In this article we will see how to build a simple weather alert application using Python, Airflow, Kafka, ksqlDB, Faust and Docker. Of course you can download your favorite weather alert application or even make a simple api call to OpenWeather to do what is done in this blog. Idea here is to explore these systems and to build a decoupled yet integrated demo. Code used in this article can be found here.

Prerequisite

Docker

Download and install docker for your platform. Click here for instructions

Telegram

  • Download and install Telegram for your platform

  • Create Telegram bot by placing new bot request to BotFather

    • Send /newbot to BotFather
    • Choose a name for your bot
    • Choose a username for your bot
  • We have the bot and access details now. See here for detailed instructions.

  • Test your bot by sending a test message. To send messages, you will need chat_id which can retrieved by sending /start to https://telegram.me/userinfobot

curl -s -X POST https://api.telegram.org/bot<BOT-ACCESS-TOKEN>/sendMessage \
    -d chat_id=<CHAT-ID>\
    -d text="Its raining in home now"

Twilio

  • Create Twilio account, phone number. See here for details.

  • Create Twilio API Keys from the Twilio Console. See here for details.

  • Test your Twilio account by sending a test message.

curl -X POST https://api.twilio.com/2010-04-01/Accounts/$TWILIO_ACCOUNT_SID/Messages.json \
--data-urlencode "From=<TWILIO_PHONE_NUMBER>" \
--data-urlencode "Body=Its raining in home now" \
--data-urlencode "To=<PHONE_NUMBER>" \
-u $TWILIO_ACCOUNT_SID:$TWILIO_AUTH_TOKEN

Prepare docker-compose

  • Download docker-compose.yml and all required components from here

  • The docker-compose.yml will bring up following services

    • Kafka (Zookeeper, Broker, Kafka Connect, Schema Registry, ksqlDB)
    • Airflow (Postgres, Webserver and Scheduler)
    • Python Kafka producer module to source data from OpenWeather. This is a custom image which will get build from the Dockerfile located in /weather-alert-app/docker/app/
    • Faust Kafka Consumer to stream process the events and to send Twilio notification. This is a custom image which will get build from the Dockerfile located in /weather-alert-app/docker/faust/
  • Here is the list of steps which should be done before bringing up the docker services

    • Create a copy of /weather-alert-app/kafka-connect/secrets/connect-secrets.properties.template as /weather-alert-app/kafka-connect/secrets/connect-secrets.properties and update it with the Telegram and Twilio details.

    TO DO : HTTP connector for Twilio is not working yet and need some research on POST operation with HTTP connector. Telegram alerts come from Kafka connector and Twilio alerts come from Faust streaming application.

    • Create a copy of .env.template as .env in /weather-alert-app/docker/faust to update the Twilio details.

    • Edit weather-alert-dag.py in /weather-alert-app/dags/ to update lat, lon and OPEN_WEATHER_API_KEY in the DockerOperator environment for the location which you want to track.

    environment={
          'bootstrap_servers': "broker:9092",
          'schema_registry_url': "http://schema-registry:8081",
          'topic_name': "weather.alert.app.source",
          'lat': "",
          'lon': "",
          'OPEN_WEATHER_API_KEY': ""
          }
    

    TODO: Change to environment variable to avoid editing the dag manually

    • Grant execution permission for the scripts
    chmod -R 764 /weather-alert-app/kafka-connect/scripts
    

Now we have all required artifacts and the next step to start the Kafka services.

Start and validate the application

  • Start the application by running docker-compose up --remove-orphans -d --build

  • Validate the status of docker containers by running docker-compose ps

         Name                        Command               State                      Ports
--------------------------------------------------------------------------------------------------------------
airflow-postgres          docker-entrypoint.sh postgres    Up       5432/tcp
airflow-webserver         /entrypoint.sh webserver         Up       5555/tcp, 0.0.0.0:8080->8080/tcp, 8793/tcp
broker                    /etc/confluent/docker/run        Up       0.0.0.0:29092->29092/tcp, 9092/tcp
control-center            /etc/confluent/docker/run        Up       0.0.0.0:9021->9021/tcp
kafka-client              bash -c -a echo Waiting fo ...   Exit 1
kafka-connect             bash -c echo "===> Install ...   Up       0.0.0.0:8083->8083/tcp, 9092/tcp
kafkacat                  /bin/sh -c apk add jq;           Up
                          wh ...
ksqldb-server             /usr/bin/docker/run              Up       0.0.0.0:8088->8088/tcp
schema-registry           /etc/confluent/docker/run        Up       0.0.0.0:8081->8081/tcp
weather-alert-app         bash -c echo "Launching we ...   Up
weather-alert-app-faust   faust -A app worker -l info      Up
zookeeper                 /etc/confluent/docker/run        Up       0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
  • Since we are starting ksqlDB in headless mode, there is no CLI or REST endpoints available. Logs for ksqlDB can be validated by running docker logs -f ksqldb-server

  • Now we can turn on the Airflow DAG by clicking ON button. This will trigger the pipeline to source data from OpenWeather.

Data can be generated manually for testing by SSHing into the docker and running below command

docker exec -it weather-alert-app /bin/bash

export OPEN_WEATHER_API_KEY=<your-open-weather-api-key>

python weather-alert-app.py --bootstrap_servers=broker:9092 --topic_name=weather.alert.app.source --schema_registry_url=http://schema-registry:8081 --lat=8.270272 --lon=77.177274
  • We can see data in each intermediate topics by running following commands.
## Data is in avro format in source topic
docker exec -it kafkacat kafkacat -C -b broker:9092 -t weather.alert.app.source -s value=avro -r http://schema-registry:8081 -o beginning

docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_0020_SEL_ATTRIBUTES -o beginning -f 'Key: %k, Message: %s \n'

docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_0030_RAIN_IS_HERE -o beginning -f 'Key: %k, Message: %s \n'

docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_0040_UNIQUE_KEY -o beginning -f 'Key: %k, Message: %s \n'

docker exec -it kafkacat kafkacat -C -b broker:9092 -t TBL_WEATHER_ALERT_APP_0050_UNIQUE_KEY -o beginning -f 'Key: %k, Message: %s \n'

docker exec -it kafkacat kafkacat -C -b broker:9092 -t STM_WEATHER_ALERT_APP_9000_NOTIFY -o beginning -f 'Key: %k, Message: %s \n'
  • Since the ksqlDB, Kafka connector and Faust services are all running they will process and send a notification as soon as we see a RAIN event.

We have logic in ksql queries to emit only one event for a given day.

Notes

  • You can see the list of all containers by running docker container ls -a
  • You can bring down the containers by running docker-compose down
  • You can bring down the containers and related volumes by running docker-compose down --volumes
  • You can delete all exited containers by running docker rm $(docker ps -q -f status=exited)

References

Airflow

Python

Kafka

Error and Solution

Here are some common errors and solution

  1. Error: 2020-10-27 17:28:19.757 UTC [1] DETAIL: The data directory was initialized by PostgreSQL version 9.6, which is not compatible with this version 13.0

Solution: Delete the old volume and bring up the services again

docker ps -a

docker inspect -f '{{ .Mounts }}' <container-id>

docker volume ls

docker volume rm <volume-name>

docker-compose down --volumes

docker-compose up --remove-orphans -d --build
  1. ERROR: error while removing network: network weather-alert-app_default id 1c0d12eac307d89c85cd924dfceeb28cbba4955ee5ede59076d5eda8cf1ccaec has active endpoints
docker network inspect {network}
docker network inspect weather-alert-app_default

docker network disconnect -f {network} {endpoint-name}
docker network disconnect -f weather-alert-app_default ffd95c0068e1e82b13f66fe578f93990b19c5fd757ed8201652b2f14a3d85377

docker ps -a
docker ps -qa
docker stop $(docker ps -q)
Share this blog:
Comments

Related Articles