1 kafka-topics.sh

1.1 创建Topic

创建一个名为topic-demo,副本数为3,分区数为3的主题

$ kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-demo --replication-factor 3 --partitions 3

1.2 查看Topic详情

$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo

Topic:topic-demo	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: topic-demo	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: topic-demo	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
	Topic: topic-demo	Partition: 2	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1

1.3 查看Topic列表

$ kafka-topics.sh --zookeeper localhost:2181 --list

topic-demo

1.4 修改Topic分区

# 修改分区
$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-demo --partitions 4

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo

Topic:topic-demo	PartitionCount:4	ReplicationFactor:3	Configs:
	Topic: topic-demo	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: topic-demo	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
	Topic: topic-demo	Partition: 2	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1
	Topic: topic-demo	Partition: 3	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1

1.4 删除Topic

# 删除Topic,如果delete.topic.enable值为false,则删除操作将执行失败
$ kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic-demo

Topic topic-demo is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

2 kafka-console-consumer.sh

2.1 按主题接收消息

$ kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic topic-demo

2.2 指定分区接收消息

$ kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic topic-demo --partition 1

2.3 指定分组接收消息

$ kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic topic-demo --partition 1 --group group-demo

2.4 指定偏移接收消息

$ kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic topic-demo --partition 1 --group group-demo --offset 1 

3 kafka-console-producer.sh

3.1 查看帮助

$ kafka-console-producer.sh
Read data from standard input and publish it to Kafka.
Option                                   															Description                            
------                                   															-----------                            
--batch-size <Integer: size>             															Number of messages to send in a single  batch if they are not being sent synchronously. (default: 200)        
--broker-list <String: broker-list>      															REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.    
--compression-codec [String:compression-codec] 												The compression codec: either 'none',  'gzip', 'snappy', or 'lz4'.If specified without value, then it defaults to 'gzip'                   
--line-reader <String: reader_class>     															The class name of the class to use for  reading lines from standard in. By default each line is read as a separate message. (default: kafka.tools.ConsoleProducer$LineMessageReader)   
--max-block-ms <Long: max block on send>       												The max time that the producer will block for during a send request (default: 60000)                     
--max-memory-bytes <Long: total memory in bytes>  										The total memory used by the producer to buffer records waiting to be sent to the server. (default: 33554432)   
--max-partition-memory-bytes <Long:memory in bytes per partition>     The buffer size allocated for a partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. (default: 16384)                     
--message-send-max-retries <Integer>     															Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this  message. (default: 3)                
--metadata-expiry-ms <Long: metadata expiration interval>     				The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. (default: 300000)
--producer-property <String:producer_prop>             								A mechanism to pass user-defined  properties in the form key=value to the producer.                        
--producer.config <String: config file>  															Producer config properties file. Note that [producer-property] takes precedence over this config.         
--property <String: prop>                															A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader.              
--request-required-acks <String:request required acks>         				The required acks of the producer requests (default: 1)                
--request-timeout-ms <Integer: request timeout ms>   									The ack timeout of the producer requests. Value must be non-negative and non-zero (default: 1500)         
--retry-backoff-ms <Integer>             															Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default: 100)         
--socket-buffer-size <Integer: size>     															The size of the tcp RECV size. (default: 102400)                    
--sync                                   															If set message send requests to the brokers are synchronously, one at a time as they arrive.                 
--timeout <Integer: timeout_ms>          															If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default: 1000)                      
--topic <String: topic>                  															REQUIRED: The topic id to produce messages to.

3.2 发送消息

$ kafka-console-producer.sh --bootstrap-server 192.168.0.1:9092 --topic topic-demo

> hello kafka