Kafka commands

Table of Contents


Article to show the usage of some common Kafka commands. These commands are executed from Kafka’s command line interface and are part of Kafka’s distribution.


  • An environment with Kafka bin, this is were the Kafka scripts are located. You can download the latest version of Kafka from here
  • The environment should also have Java/JRE installed since most of these commands uses Java


Some of these assumptions are old from a hadoop based Kafka environment. These assumptions may not be true for your environment, just make sure you have a Kafka bin directory with necessary scripts as mentioned in prerequisites.

  • Kafka home directory is /usr/hdp/current/kafka OR /usr/hdp/current/kafka-broker
  • All Kafka scripts are located in /usr/hdp/current/kafka/bin OR /usr/hdp/current/kafka-broker/bin
  • .sh extension is not needed when running these commands in a Confluent Kafka distribution


Kafka Cluster


Command to check the status of local Kafka cluster

kafka status

Describe brokers

./ --bootstrap-server --entity-type brokers --entity-default --describe
./ --bootstrap-server --entity-type brokers --describe

Alter brokers

./ --bootstrap-server --entity-type brokers --entity-default --alter --add-config

Test Connection to Kafka broker

  • Test using netcat command nc -vz 31806
  • Test using curl command curl -v
  • Test un-secured endpoint using curl command curl -v
  • Test secured endpoint using curl command curl -v -u "<API_KEY>:<TOKEN>"

Kafka Topic

Create topics

Command to create a Kafka topic

./ --create --bootstrap-server --replication-factor 1 --partitions 13 --topic entechlog-test-0001

List topics

Command to list all Kafka topics in the Kafka cluster

./ --list --bootstrap-server

You can also use kafka-topics command with a properties file when connecting a secured Kafka broker. To do this first create a properties file like the below one and then issue the kafka-topics command.

sasl.mechanism=PLAIN required username="your-user-name" password="your-password";

Specify the command config when running the kafka-topics command

./ --command-config --list --bootstrap-server

List topics with overrides

./ --bootstrap-server --describe --topics-with-overrides

Describe topics

./ --describe --bootstrap-server --topic "entechlog-test-0001"

You can also describe the dynamic configs of topic using kafka-config command

./ --bootstrap-server --describe --entity-type topics --entity-name "entechlog-test-0001"
./ --command-config /path/ --bootstrap-server --describe --entity-type topics --entity-name entechlog-test-0001

Here is a example of how config would look like

ssl.endpoint.identification.algorithm=HTTPS required username="userName" password="PassWord";

Delete topics

./ --delete --topic entechlog-test-0001 --bootstrap-server

If you don’t have delete.topic.enable=true enabled, Then delete will have no effect.

Alter topics

./ --bootstrap-server --alter --entity-type topics --entity-name "entechlog-test-0001" --add-config
./ --command-config /path/ --bootstrap-server --alter --entity-type topics --entity-name entechlog-test-0001 --add-config

Find the size of Kafka topic

./ --describe --bootstrap-server --topic-list entechlog-test-0001

Count the number of records in a topic

./ --broker-list --topic entechlog-test-0001 --time -1 | tr ":" " " | awk '{ sum += $3 } END { print sum }'


Console producer

Simple Kafka console producer

./ --broker-list --topic entechlog-test-0001

Kafka console producer with security protocol

./ --broker-list --topic entechlog-test-0001 --security-protocol SASL_PLAINTEXT

Kafka console producer with config

./ --broker-list --topic test-topic-console-producer-01 --producer.config  /path/ --property "parse.key=true" --property "key.separator=:"

Avro console producer

Kafka Avro console producer

./ --broker-list --topic test-topic-avro-producer --producer.config  /path/ --property "schema.registry.url=https://localhost" --property"your-user-name:your-password" --property basic.auth.credentials.source="USER_INFO" --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'

This example uses a schema so you should pass a message which is compatible with the schema like the below one

{"id": 999, "product": "foo", "quantity": 100, "price": 50}


Console consumer

Consume from beginning

./ --bootstrap-server --from-beginning --topic entechlog-test-0001

Consume from current offset

./ --bootstrap-server --topic entechlog-test-0001

Kafka console consumer with security protocol

./ --bootstrap-server --topic entechlog-test-0001 --from-beginning --security-protocol SASL_PLAINTEXT

Consumer Groups

Describe consumer groups

./ --command-config /path/ --bootstrap-server --group connect-CONNECTOR-NAME --describe

Change consumer groups offset to specific value

./ --command-config /path/ --bootstrap-server --group connect-CONNECTOR-NAME --to-offset 1 --topic TOPIC_NAME --reset-offsets --dry-run

Remove --dry-run to actually make the changes

Change consumer groups offset to earliest offset

./ --command-config /path/ --bootstrap-server --reset-offsets --group connect-CONNECTOR-NAME --topic TOPIC_NAME --to-earliest --dry-run

Hope this was helpful. Did I miss something ? Let me know in the comments OR in the forum section.

Share this blog:

Related Articles