通过指定分区号和offset编号查询kafka消息内容

查询从topic查询某个offset的kafka消息内容,可以使用kafka自带工具:kafka-console-consumer.sh
./kafka-console-consumer.sh \
    --topic op-offset-reload \
    --bootstrap-server  localhost:9092 \
	--property print.timestamp=true \
	--property print.key=true \
	-property key.separator="-" \
	--group GID_OP_NEWER \
    --partition 0 \
    --offset 241 \
	--max-messages 1
  • 参数介绍
    • –property 消息输出内容,打印消息推送的时间戳,打印key,key.separator=用于输出内容的分隔符
    • –group 指定console-consumer脚本使用的group,可以使用kafka-consumer-group.sh –group test –delete 删除group
    • –partition 指定查询分区号
    • –offset 指定消费的偏移号
    • –max-messages 最大获取消息条数,如果没有获取到对应条数的消息,会一直阻塞

通过指定时间区间查询消息内容

kafkacat -b localhost:9092 -t op-offset-test -C -o s@1640334600000 -o e@1640334633000
# 时间戳单位毫秒

重置消费点位

  • 前置条件kafka version大于等于0.11.0.0版本

  • 必须确保需要重置点位的group处于inactive,判断方法: ./kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group GID_op_test –describe Note: This will not show information about old Zookeeper-based consumers. Consumer group ‘GID_op_test’ has no active members.

  • kafka-consumer-groups.sh脚本用于重置消费点位作用域:

    • 确定topic作用域:

      –all-topics(为consumer group下所有topic的所有分区调整位移),

      –topic t1 –topic t2(为指定的若干个topic的所有分区调整位移 –topic t1:0,1,2(为指定的topic分区调整位移)

    • 确定位移重设策略:

      –to-earliest:把位移调整到分区当前最小位移

      –to-latest:把位移调整到分区当前最新位移

      –to-current:把位移调整到分区当前位移

      –to-offset : 把位移调整到指定位移处

      –shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动

      –to-datetime :把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

      –by-duration :把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S

    –from-file :从CSV文件中读取调整策略

    • 确定执行方案

      –execute:执行真正的位移调整 –export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

  • 测试重置消费点位

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GID_op_test --reset-offsets --topic op-offset-reload --to-earliest --execute
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --topic op-offset-reload --to-datetime 2017-08-04T14:30:00.000
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group --reset-offsets --all-topics --to-datetime 2018-10-23T18:50:00.000+08:00 --execute