How to generate test data for Kafka using Faker

  • Home
  • /
  • How to generate test data for Kafka using Faker
How to generate test data for Kafka using Faker

How to generate test data for Kafka using Faker

Kafka 20 Jun 2020 Siva Nadesan

In this article we will see how to generate test data for Kafka using Faker. We could also generate test data using Confluent ksql-datagen Tool, If you want to use ksql-datagen read more about it at https://docs.ksqldb.io/en/latest/developer-guide/test-and-debug/generate-custom-test-data/#generate-records-from-an-avro-schema

  • Create a python module with faker to generate test data, Here we are going to use faker.simple_profile() to generate some random user profile information
#!/usr/bin/env python

from faker import Faker
from confluent_kafka import Producer
import socket
import json
import sys, getopt, time, os
import argparse

def main():
    # Display the program start time
    print('-' * 40)
    print(os.path.basename(sys.argv[0]) + " started at ", time.ctime())
    print('-' * 40)

    print('Number of arguments          :', len(sys.argv))
    print('Argument List:', str(sys.argv))

    parser = argparse.ArgumentParser(description="""
    This script generates sample data for specified kafka topic. 
    """)
    parser.add_argument("--bootstrap_servers", help="Bootstrap servers", required=True)
    parser.add_argument("--topic_name", help="Topic name", required=True)
    parser.add_argument("--no_of_records", help="Number of records", type=int, required=True)

    args = parser.parse_args()

    global bootstrap_servers
    bootstrap_servers = args.bootstrap_servers

    global topic_name
    topic_name = args.topic_name

    global no_of_records
    no_of_records = args.no_of_records

    print("Bootstrap servers            : " + bootstrap_servers)
    print("Topic name                   : " + topic_name)
    print("Number of records            : " + str(no_of_records))

class DatetimeEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return super(DatetimeEncoder, obj).default(obj)
        except TypeError:
            return str(obj)

def faker_datagen():
    conf = {'bootstrap.servers': bootstrap_servers,
        'client.id': socket.gethostname()}
        
    producer = Producer(conf)
    faker = Faker()
    count = 0
    while count < no_of_records:
        profile = faker.simple_profile()
        #print(profile)
        #print(profile['username'])
        message = json.dumps(profile, cls=DatetimeEncoder)
        key=str(profile['username'])
        producer.produce(topic=topic_name, value=message, key=key)
        producer.flush()
        count += 1
        
if __name__ == "__main__":
    main()
    faker_datagen()
    sys.exit()

  • Create “requirements.txt” file with following information
Faker
confluent-kafka
  • Create “Dockerfile” with following commands
FROM python:3

COPY . /usr/src/
WORKDIR /usr/src/

COPY requirements.txt ./

RUN pip install --no-cache-dir -r requirements.txt

RUN apt-get update && apt-get install -y \
    iputils-ping \
    iproute2 \
    curl \
	dos2unix \
	netcat \
	net-tools \
 && rm -rf /var/lib/apt/lists/*

COPY . .

ENV bootstrap_servers localhost:9092
ENV topic_name datagen.user.profile 
ENV no_of_records 1

CMD ["sh", "-c", "python /usr/src/app/faker-kafka-profile-datagen.py --bootstrap_servers $bootstrap_servers --topic_name $topic_name --no_of_records $no_of_records"]

  • Place all files in a folder as shown below and then create Docker images by running below build command. Tag/Name the image as per your use case so that you can identify the image in future.
│   Dockerfile
│   requirements.txt
│
└───app
        faker-kafka-profile-datagen.py
post thumb
docker build --tag entechlog/faker-kafka-profile-datagen .
  • Verify the image by running
docker images
  • Run the docker image by running below command. Here you can change the bootstrap_servers, topic_name and no_of_records based on your needs.
docker run -it --rm  -e bootstrap_servers=192.168.1.9:39092 -e topic_name=datagen.user.profile -e no_of_records=1 entechlog/faker-kafka-profile-datagen

Before running the docker run updated below settings in kafka broker so that producer from docker can talk to kafka broker in a different server.

listeners=PLAINTEXT://:9092,DOCKER_LISTENER://:19092,DNS_LISTENER://:29092,IP_LISTENER://:39092
advertised.listeners=PLAINTEXT://localhost:9092,DOCKER_LISTENER://host.docker.internal:19092,DNS_LISTENER://entechlog-vm-01:29092,IP_LISTENER://192.168.1.9:39092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,DOCKER_LISTENER:PLAINTEXT,DNS_LISTENER:PLAINTEXT,IP_LISTENER:PLAINTEXT
  • Validate the records in kafka topic. You can do this by a kafka consumer OR by PRINT datagen.user.profile from ksql session OR by simply browsing the messages in control center.
post thumb
  • As show in above image, we have test data in kafka topic and we can run this image anytime when we need to generate test data in a kafka topic.

  • Use this topic and data to create stream OR TABLE in ksqlDB

CREATE STREAM STM_DATAGENUSER_PROFILE (
	USERNAME VARCHAR
	,NAME VARCHAR
	,SEX VARCHAR
	,ADDRESS VARCHAR
	,MAIL VARCHAR
	,BIRTHDATE VARCHAR
	)
	WITH (
			KAFKA_TOPIC = 'datagen.user.profile'
			,VALUE_FORMAT = 'JSON'
			,KEY='USERNAME'
			);
 Message
----------------
 Stream created
----------------
  • Query the data in STREAM using below query.
ksql> SELECT ROWKEY, BIRTHDATE FROM STM_DATAGENUSER_PROFILE EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ROWKEY                                                                            |BIRTHDATE                                                                         |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|rharris                                                                           |1987-06-03                                                                        |
|amydavidson                                                                       |1955-08-15                                                                        |
|williamzimmerman                                                                  |1918-02-18                                                                        |
|ashleygutierrez                                                                   |1915-10-16                                                                        |
|aharris                                                                           |2004-08-01                                                                        |
|smithmelissa                                                                      |1999-10-29                                                                        |
|mjones                                                                            |2005-06-29                                                                        |
|matthew54                                                                         |1928-08-04                                                                        |
|raymondgarrison                                                                   |1928-07-24                                                                        |
|zachary13                                                                         |1951-10-27                                                                        |
|alan32                                                                            |1974-08-06                                                                        |
|paynechristopher                                                                  |1914-12-21                                                                        |
|jacksondunn                                                                       |1924-08-03                                                                        |

Source code for this faker datagen docker image can be found here

Docker Notes
Stop container docker stop faker-kafka-profile-datagen
Remove container docker rm faker-kafka-profile-datagen
SSH into container docker exec –it faker-kafka-profile-datagen /bin/bash
Find Docker Gateway /sbin/ip route|awk '/default/ { print $3 }'
Netcat docker run -it --rm --entrypoint "/bin/nc" faker-kafka-profile-datagen -vz 192.168.1.9 39092
Remove containers docker container rm $(docker container ls -aq)
Remove images docker image prune -a
References


About The Authors
Siva Nadesan

Siva Nadesan is a Principal Data Engineer. His passion includes data and blogging about technologies. He is also the creator and maintainer of www.entechlog.com

LinkedIn

Share: