系统 启动 Kafka -daemon 参数可以让 Kafka 在后台运行。
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
 
指定 JMX 端口启动 JMX 的全称为 Java Management Extensions。顾名思义,是管理 Java 的一种扩展,通过 JMX 可以方便我们监控 Kafka 的内存,线程,CPU 的使用情况,以及生产和消费消息的指标。
JMX_PORT=9999 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
 
停止 Kafka 
kafka-server-stop.sh 
 
Topic 创建Topic 
kafka-topics.sh –create  –bootstrap-server :9092 –replication-factor 3 –partitions 3 –topic  
 
列出所有 Topic 
kafka-topics.sh  –bootstrap-server :9092 –list 
 
查看指定 Topic 
kafka-topics.sh –bootstrap-server :9092 –describe –topic  
 
删除指定 Topic 
kafka-topics.sh –bootstrap-server  :9092 –delete –topic  
 
扩展 Topic 的 Partition 数量 artition 数量只能扩大不能缩小。
kafka-topics.sh –bootstrap-server :9092 –topic app –alter –partitions 30 
 
扩展 topic 每个 partition 的副本数量 replication factor 可以扩大也可以缩小,最多不能超过 broker 数量。先创建一个文件名为 increace-factor.json,这里要扩展的是 mysql-audit-log 这个 topic 的 partition 到 15 个:0,1,2 为 broker id。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 {"version" :1 ,"partitions" :["topic" :"mysql-audit-log" ,"partition" :0 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :1 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :2 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :3 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :4 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :5 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :6 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :7 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :8 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :9 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :10 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :11 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :12 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :13 ,"replicas" :[0 ,1 ,2 ]},"topic" :"mysql-audit-log" ,"partition" :14 ,"replicas" :[0 ,1 ,2 ]}
kafka-reassign-partitions.sh –zookeeper :2181 –reassignment-json-file  increace-factor.json –execute  
 
查看 Topic 数据大小 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 \ 192.168 .1.87 :9092  \ list  mytopic \ \ '(?<=size":)\d+'   \ '{ sum += $1 } END { print sum }' 648 \ 192.168 .1.87 :9092  \ list  mytopic \ \ '^{'  \ '[ ..|.size? | numbers ] | add' 648 
消费者组 Consumer Group 列出所有的 Consumer Group 
 kafka-consumer-groups.sh –bootstrap-server :9092 –list 
 
查看指定 Consumer Group 详情 GROUP:消费者 group ip  地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #这里查看的是 logstash_mysql 这个消费者 group 的消费情况10.37 .62 .20 :9092  --describe --group logstash_mysql11          1312115          1312857          742              logstash-5 -0545 a8a7-f7bd-430 c-b619-7 a2b206addd2  /10.37 .62 .24     logstash-5 1           1312593          1313345          752              logstash-0 -d86bd51a-d010-45 de-aa6f-f6da8542b779  /10.37 .62 .23     logstash-0 2           1309548          1310317          769              logstash-1 -496340 ea-935 d-444 d-a184-51 d42e225054  /10.37 .62 .24     logstash-1 12          1313083          1313194          111              logstash-6 -806 b20cb-a9af-49 c1-b37d-ccb33a646ab2  /10.37 .62 .24     logstash-6 6           1310984          1311192          208              logstash-13 -8 d474bf6-e8d0-4 b8a-b319-cf5e2e6cc078 /10.37 .62 .24     logstash-13 9           1312998          1313768          770              logstash-3 -29863f b0-6708 -4f b1-9e28 -bd81c30ce8ef  /10.37 .62 .24     logstash-3 4           1315150          1315276          126              logstash-11 -6 d66a188-85 b7-476 b-bd89-5423 ef48cd01 /10.37 .62 .24     logstash-11 0           22770935522      22770935650      128              logstash-0 -7 be475d6-a49e-4f f9-bf83-6 b83f6067306  /10.37 .62 .24     logstash-0 8           1309956          1310103          147              logstash-2 -3 c313c6f-8 c98-4333 -8 bad-2f 9696457d7d  /10.37 .62 .24     logstash-2 13          1314659          1314775          116              logstash-7 -e98fd14e-e7f6-45e5 -8 ccf-2442058f 0bc9  /10.37 .62 .24     logstash-7 14          1313145          1313250          105              logstash-8 -2 c3345a8-f8f1-4f 08-a18e-333 dff2f0d65  /10.37 .62 .24     logstash-8 5           1314037          1314297          260              logstash-12 -ce018227-9e59 -4137 -a23f-5 ccc0c7d4f6a /10.37 .62 .24     logstash-12 10          1312883          1312962          79               logstash-4 -9 eb84ae4-3351 -4083 -9 b1f-288910 a6c3b8  /10.37 .62 .24     logstash-4 7           1312476          1313200          724              logstash-14 -680 c982e-5 cf3-406 b-810 a-4 d5c96b5bdee /10.37 .62 .24     logstash-14 3           1313227          1313328          101 
删除指定 Consumer Group 
kafka-topics.sh –bootstrap-server :9092 –delete –topic pgw-nginx 
 
消息 生产消息 kafka-console-producer.sh --broker-list 11.8 .36 .125 :9092  --topic mytopicthis  is  my topic
生产消息指定 Key key.separator=, 指定以逗号作为 key 和 value 的分隔符。
kafka-console-producer.sh --broker-list kafka1:9092  --topic cr7-topic --property   parse.key= true  --property   key.separator= ,"orderAmount" :1000 ,"orderId" :1 ,"productId" :101 ,"productNum" :1 }
消费消息 从头开始消费 从头开始消费是可以消费到之前的消息的,通过 --from-beginning 指定:
kafka-console-consumer.sh --bootstrap-server 11.8 .36 .125 :9092  --topic mytopic --from -beginningthis  is  my topic
从尾部开始消费 --offset latest 指定从尾部开始消费,另外还需要指定 partition,可以指定多个:
kafka-console-consumer.sh --bootstrap-server 11.8 .36 .125 :9092  --topic mytopic  --offset latest  --partition 0  1  2 
消费指定条数的消息 --max-messages 指定取的个数:
kafka-console-consumer.sh --bootstrap-server 11.8 .36 .125 :9092  --topic mytopic  --offset latest  --partition 0  1  2  --max-messages 2 1111 2  messages
指定消费组进行消费 --consumer-property group.id=<消费者组名>执行消费者组进行消费:
kafka-console-consumer.sh –bootstrap-server  kafka1:9092 –topic test_partition –consumer-property  group.id=test_group –from-beginning 
 
查看消息具体内容 kafka-dump-log.sh --files cr7-topic-0 /00000000000000000000. log  -deep-iteration --print-data-log1080  CreateTime: 1615020877664  keysize: 1  valuesize: 63  sequence: -1  headerKeys: [] key: 1  payload: {"orderAmount" :1000 ,"orderId" :1 ,"productId" :101 ,"productNum" :1 }1081  CreateTime: 1615020877677  keysize: 1  valuesize: 63  sequence: -1  headerKeys: [] key: 5  payload: {"orderAmount" :1000 ,"orderId" :5 ,"productId" :105 ,"productNum" :5 }1082  CreateTime: 1615020877677  keysize: 1  valuesize: 63  sequence: -1  headerKeys: [] key: 7  payload: {"orderAmount" :1000 ,"orderId" :7 ,"productId" :107 ,"productNum" :7 }1083  CreateTime: 1615020877677  keysize: 1  valuesize: 63  sequence: -1  headerKeys: [] key: 8  payload: {"orderAmount" :1000 ,"orderId" :8 ,"productId" :108 ,"productNum" :8 }1084  CreateTime: 1615020877677  keysize: 2  valuesize: 65  sequence: -1  headerKeys: [] key: 11  payload: {"orderAmount" :1000 ,"orderId" :11 ,"productId" :111 ,"productNum" :11 }1085  CreateTime: 1615020877677  keysize: 2  valuesize: 65  sequence: -1  headerKeys: [] key: 15  payload: {"orderAmount" :1000 ,"orderId" :15 ,"productId" :115 ,"productNum" :15 }1086  CreateTime: 1615020877678  keysize: 2  valuesize: 65  sequence: -1  headerKeys: [] key: 17  payload: {"orderAmount" :1000 ,"orderId" :17 ,"productId" :117 ,"productNum" :17 }1087  CreateTime: 1615020877678  keysize: 2  valuesize: 65  sequence: -1  headerKeys: [] key: 21  payload: {"orderAmount" :1000 ,"orderId" :21 ,"productId" :121 ,"productNum" :21 }
查看 Topic 中当前消息总数 Kafka 自带的命令没有直接提供这样的功能,要使用 Kafka 提供的工具类 GetOffsetShell 来计算给定 Topic 每个分区当前最早位移和最新位移,差值就是每个分区的当前的消息总数,将该 Topic 所有分区的消息总数累加就能得到该 Topic 总的消息数。
首先查询 Topic 中每个分区 offset 的最小值(起始位置),使用 --time -2参数。一个分区的起始位置并不是每时每刻都为 0 ,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加。
kafka-run-class .sh  kafka .tools .GetOffsetShell  --broker-list  kafka1 :9092  -topic  test-topic   --time  -2 offset test-topic :0 :0 test-topic :1 :0 
然后使用--time -1参数查询 Topic 各个分区的 offset 的最大值。
kafka-run-class .sh  kafka .tools .GetOffsetShell  --broker-list  kafka1 :9092  --time  -1  --topic  test-topic test-topic :0 :5500000 test-topic :1 :5500000 
对于本例来说,test-topic 中当前总的消息数为 (5500000 - 0) + (5500000 - 0),等于 1100 万条。如果只是要获取 Topic 中总的消息数(包括已经从 Kafka 删除的消息),那么只需要将 Topic 中每个 Partition 的 Offset 累加即可。
Offset 重置消费者 Offset 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #查看消费者组消费情况0  分区 CURRENT -OFFSET  是 4 ,2  分区 CURRENT -OFFSET  是 6 groups .sh group  'my-consumer-group'  has no  active members.GROUP              TOPIC                 PARTITION   CURRENT -OFFSET   LOG -END -OFFSET   LAG             CONSUMER-ID     HOST            CLIENT-IDgroup  transaction -topic-msg 2           6                6                0                -               -               -group  transaction -topic-msg 1           0                0                0                -               -               -group  transaction -topic-msg 0           4                4                0                -               -               -         -offset  为 3 ,重置是所有分区一起重置groups .sh 2021 -06 -25  10 :44 :51 ,848 ] WARN New  offset  (3 ) is  higher than latest offset  for  topic partition  transaction -topic-msg-1.  Value  will be set  to  0  (kafka.admin .ConsumerGroupCommand$)GROUP                           TOPIC                          PARTITION   NEW -OFFSET      group               transaction -topic-msg          0           3               group               transaction -topic-msg          1           0               group               transaction -topic-msg          2           3               0  分区和 2  分区的 CURRENT -OFFSET  都变为 3  了groups .sh group  'my-consumer-group'  has no  active members.GROUP              TOPIC                 PARTITION   CURRENT -OFFSET   LOG -END -OFFSET   LAG             CONSUMER-ID     HOST            CLIENT-IDgroup  transaction -topic-msg 2           3                6                3                -               -               -group  transaction -topic-msg 1           0                0                0                -               -               -group  transaction -topic-msg 0           3                4                1                -               -               --111111 -333333 
性能测试 --num-records 10000000 : 向指定主题发送了 1  千万条消息。1024 : 每条消息的大小为 1024 KB。-1 : 不限制吞吐量。-1 : 这要求 ISR 列表里跟 leader 保持同步的那些 follower 都要把消息同步过去,才能认为这条消息是写入成功。2000 : batch.size 和 linger.ms 是对 kafka producer 性能影响比较大的两个参数。batch.size 是 producer 批量发送的基本单位,默认是 16384 Bytes,即 16 kB;lingger.ms 是 sender 线程在检查 batch 是否 ready 时候,判断有没有过期的参数,默认大小是 0 ms。
[root@kafka1 ~]705600  records sent, 141063.6  records/sec (137.76  MB/sec), 54.8  ms  avg  latency, 557.0  ms  max  latency.1204178  records sent, 240739.3  records/sec (235.10  MB/sec), 44.1  ms  avg  latency, 402.0  ms  max  latency.1370938  records sent, 274187.6  records/sec (267.76  MB/sec), 27.9  ms  avg  latency, 311.0  ms  max  latency.1464605  records sent, 292628.4  records/sec (285.77  MB/sec), 19.2  ms  avg  latency, 139.0  ms  max  latency.1477239  records sent, 295447.8  records/sec (288.52  MB/sec), 31.8  ms  avg  latency, 290.0  ms  max  latency.1446682  records sent, 289336.4  records/sec (282.56  MB/sec), 26.4  ms  avg  latency, 281.0  ms  max  latency.1555098  records sent, 311019.6  records/sec (303.73  MB/sec), 37.6  ms  avg  latency, 344.0  ms  max  latency.10000000  records sent, 263894.020162  records/sec (257.71  MB/sec), 32.60  ms  avg  latency, 557.00  ms  max  latency, 12  ms  50th , 140  ms  95th , 262  ms  99th , 396  ms  99 .9 th.
我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到 99th 分位就可以了。比如在上面的输出中,99th 值是 262 ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 262 ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。
消费者性能测试 [root@kafka1 ~] # kafka-consumer-perf-test .sh  --broker-list  kafka1 :9092  --messages  10000000 --topic  test_producer_perf start .time , end .time , data .consumed .in .MB , MB .sec , data .consumed .in .nMsg , nMsg .sec , rebalance .time .ms , fetch .time .ms , fetch .MB .sec , fetch .nMsg .sec -03-09  10:34 :18 :447 , 2021-03-09  10:34 :33 :948 , 9765.6250 , 629.9997 , 10000000, 645119.6697 , 1615257259068, -1615257243567 , -0 .0000 , -0 .0062 
虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 629.9997MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。
修改动态参数 修改 Broker 动态参数 修改动态参数无需重启 Broker,动态  Broker  参数的使用场景非常广泛,通常包括但不限于以下几种:Broker  端各种线程池大小,实时应对突发流量。Broker  端连接信息或安全配置信息。Broker  端 Compact 操作性能。JMX  指标收集器 (JMX  Metrics Reporter)。Broker  Config  的参数有以下 3  种类型:Broker,才能令修改生效。 per-broker:被标记为  per-broker  的参数属于动态参数,修改它之后,只会在对应的 Broker  上生效。Broker  都生效。你也可以为具体的 Broker  修改 cluster-wide 参数。
释案
kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \enable =true  default config  for  brokers in  the cluster.
如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default。现在,我们使用下面的命令来查看一下刚才的配置是否成功。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \for  brokers in  the cluster are:enable =true  sensitive =false  synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable =true }
在 Zookeeper 上查看 /config/brokers/ 节点可以查看 cluster-wide 的动态参数设置。
[zk: (CONNECTED) ] > get /config/brokers/1 {"version":1,"config":{"unclean.leader.election.enable":"false"}} 
删除 cluster-wide 范围动态参数。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \ default config  for  brokers in  the cluster.
删除 per-broker 范围参数。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \ config  for  broker 1.
修改 Topic 动态参数retention.ms 为 10000。
kafka-configs.sh --bootstrap-server  10.37 .249 .58 :9092  \10000 
查看设置的 Topic 动态参数。
kafka-configs.sh --bootstrap-server  10.37 .249 .58 :9092  \for  topic test-topic are:10000  sensitive=false  synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000 }
在 Zookeeper 上可以查看 /config/topics/ 来查看 Topic 动态参数。
[zk: (CONNECTED) ] > get /config/topics/test-topic {"version":1,"config":{"retention.ms":"10000"}} 
删除 Topic 动态参数。
kafka-configs .sh  --bootstrap-server   10.37 .249 .58 :9092  \--entity-type  topics  --entity-name  test-topic  --alter  \--delete-config  retention .ms 
Kafka 集群一键启动/停止脚本 环境变量设置: export  KAFKA_HOME =/usr/local/kafkaexport  PATH =$PATH :$KAFKA_HOME/bin
一键启动/停止脚本,查看状态需要安装 jps 工具。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 mill =`date "+%N" `tdate =`date "+%Y-%m-%d %H:%M:%S,${mill:0:3} " `$tdate ] INFO  [Kafka Cluster] begins to  execute the $1  operation.for  i in  ${hosts[@]} do smill =`date "+%N" `stdate =`date "+%Y-%m-%d %H:%M:%S,${smill:0:3} " `$i  "source /etc/profile;echo [$stdate ] INFO [Kafka Broker $i ] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME /config/server.properties>/dev/null"  &for  i in  ${hosts[@]} do smill =`date "+%N" `stdate =`date "+%Y-%m-%d %H:%M:%S,${smill:0:3} " `$i  "source /etc/profile;echo [$stdate ] INFO [Kafka Broker $i ] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;"  &for  i in  ${hosts[@]} do smill =`date "+%N" `stdate =`date "+%Y-%m-%d %H:%M:%S,${smill:0:3} " `$i  "source /etc/profile;echo [$stdate ] INFO [Kafka Broker $i ] status message is :;jps | grep Kafka;"  &"$1 "  in "Usage: $0  {start|stop|status}" RETVAL =1