preloader
blog-post

Kafka commands

Table of Contents

Overview

Article to show the usage of some common Kafka commands. These commands are executed from Kafka’s command line interface and are part of Kafka’s distribution.

Prerequisite

  • An environment with Kafka bin, this is were the Kafka scripts are located. You can download the latest version of Kafka from here
  • The environment should also have Java/JRE installed since most of these commands uses Java

Assumptions

Some of these assumptions are old from a hadoop based Kafka environment. These assumptions may not be true for your environment, just make sure you have a Kafka bin directory with necessary scripts as mentioned in prerequisites.

  • Kafka home directory is /usr/hdp/current/kafka OR /usr/hdp/current/kafka-broker
  • All Kafka scripts are located in /usr/hdp/current/kafka/bin OR /usr/hdp/current/kafka-broker/bin
  • .sh extension is not needed when running these commands in a Confluent Kafka distribution

Commands

Kafka Cluster

Status

Command to check the status of local Kafka cluster

kafka status

Describe brokers

./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --entity-type brokers --entity-default --describe
./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --entity-type brokers --describe

Alter brokers

./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --entity-type brokers --entity-default --alter --add-config log.retention.ms=604800000

Test Connection to Kafka broker

  • Test using netcat command nc -vz 192.168.0.101 31806
  • Test using curl command curl -v 192.168.0.101:31806
  • Test un-secured endpoint using curl command curl -v https://192.168.0.101
  • Test secured endpoint using curl command curl -v -u "<API_KEY>:<TOKEN>" https://192.168.0.101

Kafka Topic

Create topics

Command to create a Kafka topic

./kafka-topics.sh --create --bootstrap-server 192.168.0.101:31806 --replication-factor 1 --partitions 13 --topic entechlog-test-0001

List topics

Command to list all Kafka topics in the Kafka cluster

./kafka-topics.sh --list --bootstrap-server 192.168.0.101:31806

You can also use kafka-topics command with a properties file when connecting a secured Kafka broker. To do this first create a properties file like the below one and then issue the kafka-topics command.

retries=3
retry.backoff.ms=500
batch.size=65536
bootstrap.servers=192.168.0.101:31806
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your-user-name" password="your-password";

Specify the command config when running the kafka-topics command

./kafka-topics.sh --command-config config.properties --list --bootstrap-server 192.168.0.101:31806

List topics with overrides

./kafka-topics.sh --bootstrap-server 192.168.0.101:31806 --describe --topics-with-overrides

Describe topics

./kafka-topics.sh --describe --bootstrap-server 192.168.0.101:31806 --topic "entechlog-test-0001"

You can also describe the dynamic configs of topic using kafka-config command

./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --describe --entity-type topics --entity-name "entechlog-test-0001"
./kafka-configs.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --describe --entity-type topics --entity-name entechlog-test-0001

Here is a example of how config would look like

ssl.endpoint.identification.algorithm=HTTPS
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="userName" password="PassWord";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000

Delete topics

./kafka-topics.sh --delete --topic entechlog-test-0001 --bootstrap-server 192.168.0.101:31806

If you don’t have delete.topic.enable=true enabled, Then delete will have no effect.

Alter topics

./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --alter --entity-type topics --entity-name "entechlog-test-0001" --add-config retention.ms=-1
./kafka-configs.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --alter --entity-type topics --entity-name entechlog-test-0001 --add-config retention.ms=100

Find the size of Kafka topic

./kafka-log-dirs.sh --describe --bootstrap-server 192.168.0.101:31806 --topic-list entechlog-test-0001

Count the number of records in a topic

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.101:31806 --topic entechlog-test-0001 --time -1 | tr ":" " " | awk '{ sum += $3 } END { print sum }'

Producer

Console producer

Simple Kafka console producer

./kafka-console-producer.sh --broker-list 192.168.0.101:31806 --topic entechlog-test-0001

Kafka console producer with security protocol

./kafka-console-producer.sh --broker-list 192.168.0.101:31806 --topic entechlog-test-0001 --security-protocol SASL_PLAINTEXT

Kafka console producer with config

./kafka-console-producer.sh --broker-list 192.168.0.101:31806 --topic test-topic-console-producer-01 --producer.config  /path/producer.properties --property "parse.key=true" --property "key.separator=:"

Avro console producer

Kafka Avro console producer

./kafka-avro-console-producer.sh --broker-list 192.168.0.101:31806 --topic test-topic-avro-producer --producer.config  /path/producer.properties --property "schema.registry.url=https://localhost" --property schema.registry.basic.auth.user.info="your-user-name:your-password" --property basic.auth.credentials.source="USER_INFO" --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'

This example uses a schema so you should pass a message which is compatible with the schema like the below one

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

Consumer

Console consumer

Consume from beginning

./kafka-console-consumer.sh --bootstrap-server 192.168.0.101:31806 --from-beginning --topic entechlog-test-0001

Consume from current offset

./kafka-console-consumer.sh --bootstrap-server 192.168.0.101:31806 --topic entechlog-test-0001

Kafka console consumer with security protocol

./kafka-console-consumer.sh --bootstrap-server 192.168.0.101:31806 --topic entechlog-test-0001 --from-beginning --security-protocol SASL_PLAINTEXT

Consumer Groups

Describe consumer groups

./kafka-consumer-groups.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --group connect-CONNECTOR-NAME --describe

Change consumer groups offset to specific value

./kafka-consumer-groups.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --group connect-CONNECTOR-NAME --to-offset 1 --topic TOPIC_NAME --reset-offsets --dry-run

Remove --dry-run to actually make the changes

Change consumer groups offset to earliest offset

./kafka-consumer-groups.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --reset-offsets --group connect-CONNECTOR-NAME --topic TOPIC_NAME --to-earliest --dry-run

Hope this was helpful. Did I miss something ? Let me know in the comments OR in the forum section.

Share this blog:
Comments

Related Articles