preloader
blog-post

How to generate test data for Kafka using Faker

Table of Contents

In this article we will see how to generate test data for Kafka using Faker. We could also generate test data using Confluent ksql-datagen Tool, If you want to use ksql-datagen read more about it at here

Instructions

  • Create a python module with faker to generate test data, Here we are going to use faker.simple_profile() to generate some random user profile information

    #!/usr/bin/env python
    
    from faker import Faker
    from confluent_kafka import Producer
    import socket
    import json
    import sys, getopt, time, os
    import argparse
    
    def main():
        # Display the program start time
        print('-' * 40)
        print(os.path.basename(sys.argv[0]) + " started at ", time.ctime())
        print('-' * 40)
    
        print('Number of arguments          :', len(sys.argv))
        print('Argument List:', str(sys.argv))
    
        parser = argparse.ArgumentParser(description="""
        This script generates sample data for specified kafka topic. 
        """)
        parser.add_argument("--bootstrap_servers", help="Bootstrap servers", required=True)
        parser.add_argument("--topic_name", help="Topic name", required=True)
        parser.add_argument("--no_of_records", help="Number of records", type=int, required=True)
    
        args = parser.parse_args()
    
        global bootstrap_servers
        bootstrap_servers = args.bootstrap_servers
    
        global topic_name
        topic_name = args.topic_name
    
        global no_of_records
        no_of_records = args.no_of_records
    
        print("Bootstrap servers            : " + bootstrap_servers)
        print("Topic name                   : " + topic_name)
        print("Number of records            : " + str(no_of_records))
    
    class DatetimeEncoder(json.JSONEncoder):
        def default(self, obj):
            try:
                return super(DatetimeEncoder, obj).default(obj)
            except TypeError:
                return str(obj)
    
    def faker_datagen():
        conf = {'bootstrap.servers': bootstrap_servers,
            'client.id': socket.gethostname()}
    
        producer = Producer(conf)
        faker = Faker()
        count = 0
        while count < no_of_records:
            profile = faker.simple_profile()
            #print(profile)
            #print(profile['username'])
            message = json.dumps(profile, cls=DatetimeEncoder)
            key=str(profile['username'])
            producer.produce(topic=topic_name, value=message, key=key)
            producer.flush()
            count += 1
    
    if __name__ == "__main__":
        main()
        faker_datagen()
        sys.exit()
    
  • Create “requirements.txt” file with following information

    Faker
    confluent-kafka
    
  • Create “Dockerfile” with following commands. This will be used to create a docker image with the python code

    FROM python:3
    
    COPY . /usr/src/
    WORKDIR /usr/src/
    
    COPY requirements.txt ./
    
    RUN pip install --no-cache-dir -r requirements.txt
    
    RUN apt-get update && apt-get install -y \
        iputils-ping \
        iproute2 \
        curl \
    	dos2unix \
    	netcat \
    	net-tools \
     && rm -rf /var/lib/apt/lists/*
    
    COPY . .
    
    ENV bootstrap_servers localhost:9092
    ENV topic_name datagen.user.profile 
    ENV no_of_records 1
    
    CMD ["sh", "-c", "python /usr/src/app/faker-kafka-profile-datagen.py --bootstrap_servers $bootstrap_servers --topic_name $topic_name --no_of_records $no_of_records"]
    
  • Place all files in a folder as shown below

    β”‚   Dockerfile
    β”‚   requirements.txt
    β”‚
    └───app
            faker-kafka-profile-datagen.py
    
  • Create Docker images by running below build command. Tag/Name the image as per your use case so that you can identify the image in future

    docker build --tag entechlog/faker-kafka-profile-datagen .
    
    • Verify the image by running
    docker images
    
  • Run the docker image by running below command. Here you can change the bootstrap_servers, topic_name and no_of_records based on your needs

    docker run -it --rm  -e bootstrap_servers=192.168.1.9:39092 -e topic_name=datagen.user.profile -e no_of_records=1 entechlog/faker-kafka-profile-datagen
    

Before running the docker run, updated below config in Kafka broker so that producer from docker can talk to Kafka broker in a different server

listeners=PLAINTEXT://:9092,DOCKER_LISTENER://:19092,DNS_LISTENER://:29092,IP_LISTENER://:39092
advertised.listeners=PLAINTEXT://localhost:9092,DOCKER_LISTENER://host.docker.internal:19092,DNS_LISTENER://entechlog-vm-01:29092,IP_LISTENER://192.168.1.9:39092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,DOCKER_LISTENER:PLAINTEXT,DNS_LISTENER:PLAINTEXT,IP_LISTENER:PLAINTEXT

  • Validate the records in Kafka topic. You can do this by a Kafka consumer OR by PRINT datagen.user.profile from ksql session OR by simply browsing the messages in control center.

  • As show in above image, we have test data in Kafka topic and we can run this image anytime when we need to generate test data in a Kafka topic

  • Use this topic and data to create stream OR TABLE in ksqlDB

    CREATE STREAM STM_DATAGENUSER_PROFILE (
    	USERNAME VARCHAR
    	,NAME VARCHAR
    	,SEX VARCHAR
    	,ADDRESS VARCHAR
    	,MAIL VARCHAR
    	,BIRTHDATE VARCHAR
    	)
    	WITH (
    			KAFKA_TOPIC = 'datagen.user.profile'
    			,VALUE_FORMAT = 'JSON'
    			,KEY='USERNAME'
    			);
    
     Message
    ----------------
     Stream created
    ----------------
    
  • Query the data in STREAM using below query

    ksql> SELECT ROWKEY, BIRTHDATE FROM STM_DATAGENUSER_PROFILE EMIT CHANGES;
    +------------------------------------------+--------------------------------+
    |ROWKEY                                    |BIRTHDATE                       |
    +------------------------------------------+--------------------------------+
    |rharris                                   |1987-06-03                      |
    |amydavidson                               |1955-08-15                      |
    |williamzimmerman                          |1918-02-18                      |
    |ashleygutierrez                           |1915-10-16                      |
    |aharris                                   |2004-08-01                      |
    |smithmelissa                              |1999-10-29                      |
    |mjones                                    |2005-06-29                      |
    |matthew54                                 |1928-08-04                      |
    |raymondgarrison                           |1928-07-24                      |
    |zachary13                                 |1951-10-27                      |
    |alan32                                    |1974-08-06                      |
    |paynechristopher                          |1914-12-21                      |
    |jacksondunn                               |1924-08-03                      |
    

Source code for this faker datagen docker image can be found here

Docker Notes

Stop container docker stop faker-kafka-profile-datagen
Remove container docker rm faker-kafka-profile-datagen
SSH into container docker exec –it faker-kafka-profile-datagen /bin/bash
Find Docker Gateway `/sbin/ip route
Netcat docker run -it --rm --entrypoint "/bin/nc" faker-kafka-profile-datagen -vz 192.168.1.9 39092
Remove containers docker container rm $(docker container ls -aq)
Remove images docker image prune -a

References

Share this blog:
Comments

Related Articles