kafka-消息查询
通过指定分区号和offset编号查询kafka消息内容
查询从topic查询某个offset的kafka消息内容,可以使用kafka自带工具:kafka-console-consumer.sh
./kafka-console-consumer.sh \
--topic op-offset-reload \
--bootstrap-server localhost:9092 \
--property print.timestamp=true \
--property print.key=true \
-property key.separator="-" \
--group GID_OP_NEWER \
--partition 0 \
--offset 241 \
--max-messages 1
- 参数介绍
- –property 消息输出内容,打印消息推送的时间戳,打印key,key.separator=用于输出内容的分隔符
- –group 指定console-consumer脚本使用的group,可以使用kafka-consumer-group.sh –group test –delete 删除group
- –partition 指定查询分区号
- –offset 指定消费的偏移号
- –max-messages 最大获取消息条数,如果没有获取到对应条数的消息,会一直阻塞
通过指定时间区间查询消息内容
kafkacat -b localhost:9092 -t op-offset-test -C -o s@1640334600000 -o e@1640334633000
# 时间戳单位毫秒
重置消费点位
-
前置条件kafka version大于等于0.11.0.0版本
-
必须确保需要重置点位的group处于inactive,判断方法: ./kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group GID_op_test –describe Note: This will not show information about old Zookeeper-based consumers. Consumer group ‘GID_op_test’ has no active members.
-
kafka-consumer-groups.sh脚本用于重置消费点位作用域:
-
确定topic作用域:
–all-topics(为consumer group下所有topic的所有分区调整位移),
–topic t1 –topic t2(为指定的若干个topic的所有分区调整位移 –topic t1:0,1,2(为指定的topic分区调整位移)
-
确定位移重设策略:
–to-earliest:把位移调整到分区当前最小位移
–to-latest:把位移调整到分区当前最新位移
–to-current:把位移调整到分区当前位移
–to-offset : 把位移调整到指定位移处
–shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
–to-datetime :把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
–by-duration :把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
–from-file :从CSV文件中读取调整策略
-
确定执行方案
–execute:执行真正的位移调整 –export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
-
-
测试重置消费点位
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GID_op_test --reset-offsets --topic op-offset-reload --to-earliest --execute
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --topic op-offset-reload --to-datetime 2017-08-04T14:30:00.000
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group --reset-offsets --all-topics --to-datetime 2018-10-23T18:50:00.000+08:00 --execute