其他
kafka实战宝典:手动修改消费偏移量的两种方式
kafka实战宝典:手动修改消费偏移量的两种方式
工作中遇到过消费端报错的问题:包括数据Invalid Message和Failed_to_UNcompress等报错信息,导致消费端的iterator损坏,直接造成消费进程挂掉,如果不能及时发现问题,需要手动跳过某些数据;
Kafka的偏移量的保存方式根据版本号的异同有3种方式:保存在zookeeper中、保存在kafka的topic(_consumer_offset)中、保存在自定义的存储系统中,下面介绍前2种修改方式。
1、修改保存在zookeeper中的偏移量:
使用./zkCli.sh -server xxxx:2181 进入zk命令行模式,get对应的消费组的对应分区的偏移量,使用set方法指定偏移量;2、修改保存在kafka的topic内的偏移量:
使用Kafka自带的kafka-consumer-groups.sh脚本设置消费者组(consumer group)的位移, 这是0.11.0.0版本提供的新功能且只适用于新版本consumer, 在新版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用KafkaConsumer#seek方法。0.11.0.0版本丰富了kafka-consumer-groups脚本的功能,用户可以直接使用该脚本很方便地为已有的consumer group重新设置位移,但前提必须是consumer group必须是inactive的,即不能是处于正在工作中的状态。--all-topics:为consumer group下所有topic的所有分区调整位移
--topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
--topic t1:0,1,2:为指定的topic分区调整位移
② 确定位移重设策略(当前支持8种设置规则):
--to-earliest:把位移调整到分区当前最小位移
--to-latest:把位移调整到分区当前最新位移
--to-current:把位移调整到分区当前位移
--to-offset <offset>:把位移调整到指定位移处
--shift-by N:把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
--from-file <file>:从CSV文件中读取调整策略
③ 确定执行策略(当前支持3种):
无参:只是打印出位移调整方案,不具体执行
--execute:执行真正的位移调整
--export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
上述使用方式中,一般通过shift-by N直接调整的使用场景最多,使用如下: