Workflow Orchestration with Prefect : Part I - Introduction
What is Orchestration ? Orchestration in the context of IT and software development refers to the automated …
Article to show the usage of some common Kafka commands. These commands are executed from Kafka’s command line interface and are part of Kafka’s distribution.
Some of these assumptions are old from a hadoop based Kafka environment. These assumptions may not be true for your environment, just make sure you have a Kafka bin directory with necessary scripts as mentioned in prerequisites.
/usr/hdp/current/kafka
OR /usr/hdp/current/kafka-broker
/usr/hdp/current/kafka/bin
OR /usr/hdp/current/kafka-broker/bin
.sh
extension is not needed when running these commands in a Confluent Kafka distributionCommand to check the status of local Kafka cluster
kafka status
./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --entity-type brokers --entity-default --describe
./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --entity-type brokers --describe
./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --entity-type brokers --entity-default --alter --add-config log.retention.ms=604800000
nc -vz 192.168.0.101 31806
curl -v 192.168.0.101:31806
curl -v https://192.168.0.101
curl -v -u "<API_KEY>:<TOKEN>" https://192.168.0.101
Command to create a Kafka topic
./kafka-topics.sh --create --bootstrap-server 192.168.0.101:31806 --replication-factor 1 --partitions 13 --topic entechlog-test-0001
Command to list all Kafka topics in the Kafka cluster
./kafka-topics.sh --list --bootstrap-server 192.168.0.101:31806
You can also use kafka-topics command with a properties file when connecting a secured Kafka broker. To do this first create a properties file like the below one and then issue the kafka-topics command.
retries=3
retry.backoff.ms=500
batch.size=65536
bootstrap.servers=192.168.0.101:31806
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your-user-name" password="your-password";
Specify the command config when running the kafka-topics command
./kafka-topics.sh --command-config config.properties --list --bootstrap-server 192.168.0.101:31806
./kafka-topics.sh --bootstrap-server 192.168.0.101:31806 --describe --topics-with-overrides
./kafka-topics.sh --describe --bootstrap-server 192.168.0.101:31806 --topic "entechlog-test-0001"
You can also describe the dynamic configs of topic using kafka-config command
./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --describe --entity-type topics --entity-name "entechlog-test-0001"
./kafka-configs.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --describe --entity-type topics --entity-name entechlog-test-0001
Here is a example of how config would look like
ssl.endpoint.identification.algorithm=HTTPS
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="userName" password="PassWord";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
./kafka-topics.sh --delete --topic entechlog-test-0001 --bootstrap-server 192.168.0.101:31806
If you don’t have delete.topic.enable=true enabled, Then delete will have no effect.
./kafka-configs.sh --bootstrap-server 192.168.0.101:31806 --alter --entity-type topics --entity-name "entechlog-test-0001" --add-config retention.ms=-1
./kafka-configs.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --alter --entity-type topics --entity-name entechlog-test-0001 --add-config retention.ms=100
./kafka-log-dirs.sh --describe --bootstrap-server 192.168.0.101:31806 --topic-list entechlog-test-0001
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.101:31806 --topic entechlog-test-0001 --time -1 | tr ":" " " | awk '{ sum += $3 } END { print sum }'
Simple Kafka console producer
./kafka-console-producer.sh --broker-list 192.168.0.101:31806 --topic entechlog-test-0001
Kafka console producer with security protocol
./kafka-console-producer.sh --broker-list 192.168.0.101:31806 --topic entechlog-test-0001 --security-protocol SASL_PLAINTEXT
Kafka console producer with config
./kafka-console-producer.sh --broker-list 192.168.0.101:31806 --topic test-topic-console-producer-01 --producer.config /path/producer.properties --property "parse.key=true" --property "key.separator=:"
Kafka Avro console producer
./kafka-avro-console-producer.sh --broker-list 192.168.0.101:31806 --topic test-topic-avro-producer --producer.config /path/producer.properties --property "schema.registry.url=https://localhost" --property schema.registry.basic.auth.user.info="your-user-name:your-password" --property basic.auth.credentials.source="USER_INFO" --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'
This example uses a schema so you should pass a message which is compatible with the schema like the below one
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Consume from beginning
./kafka-console-consumer.sh --bootstrap-server 192.168.0.101:31806 --from-beginning --topic entechlog-test-0001
Consume from current offset
./kafka-console-consumer.sh --bootstrap-server 192.168.0.101:31806 --topic entechlog-test-0001
Kafka console consumer with security protocol
./kafka-console-consumer.sh --bootstrap-server 192.168.0.101:31806 --topic entechlog-test-0001 --from-beginning --security-protocol SASL_PLAINTEXT
./kafka-consumer-groups.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --group connect-CONNECTOR-NAME --describe
./kafka-consumer-groups.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --group connect-CONNECTOR-NAME --to-offset 1 --topic TOPIC_NAME --reset-offsets --dry-run
Remove --dry-run
to actually make the changes
./kafka-consumer-groups.sh --command-config /path/dev-entechlog-kafka.properties --bootstrap-server 192.168.0.101:31806 --reset-offsets --group connect-CONNECTOR-NAME --topic TOPIC_NAME --to-earliest --dry-run
Hope this was helpful. Did I miss something ? Let me know in the comments OR in the forum section.
What is Orchestration ? Orchestration in the context of IT and software development refers to the automated …
Overview Transferring data between Amazon S3 buckets is a common requirement for many AWS users. This guide will walk …