查看原文
其他

Apache Kafka 实践:重置 offset

ningg SpringForAll社区 2021-05-26

点击上方☝SpringForAll社区 轻松关注!

及时获取有趣有料的技术文章

本文来源:http://r6d.cn/bd3az


1.概要

  • 背景:线上环境在消费 Kafka 中数据,因为突发情况,需要重置 consumer group 对应的 offset。
  • 目标:重置对应 consumer group 的 offset。

2.重置 Offset

几个方面:

  1. 情况收集:
    1. 哪个 Consumer Group
    2. 消费哪些 Topic
    3. 当前 Topic 和 Consumer Group 对应的 offset 都是多少
  2. Offset 处理:
    1. 存储位置:consumer group 对应 offset 的存储位置
    2. 设置 Offset:如何设置 Offset?

直接查看当前的 offset:

./kafka-consumer-groups.sh --bootstrap-server kafka1.hdp.ningg.cn:9092 --describe --group pull-for-abacus-prodTOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-IDbinlog.test_pay.test_refund      6          1472114         1472114         0          consumer-1-78074b08-5232-42d7-9a7a-68349f856095   /10.1.187.47                   consumer-1binlog.test_pay.test_refund      7          1474380         1474380         0          consumer-1-78074b08-5232-42d7-9a7a-68349f856095   /10.1.187.47                   consumer-1binlog.test_pay.test_refund      0          1470473         1470476         3          consumer-1-1b0855c5-404f-4eec-8c40-dfebc435faba   /10.30.173.144                 consumer-1binlog.test_pay.test_refund      1          1474526         1474529         3          consumer-1-1b0855c5-404f-4eec-8c40-dfebc435faba   /10.30.173.144                 consumer-1binlog.test_pay.test_refund      4          1472482         1472482         0          consumer-1-5d8f9bdc-af6f-4f6a-9f54-2a371c4cbc8b   /10.30.125.136                 consumer-1binlog.test_pay.test_refund      5          1472702         1472707         5          consumer-1-5d8f9bdc-af6f-4f6a-9f54-2a371c4cbc8b   /10.30.125.136                 consumer-1binlog.test_pay.test_refund      2          1473468         1473468         0          consumer-1-3875d283-6a7c-4691-8611-ca97b382c66a   /10.1.186.91                   consumer-1binlog.test_pay.test_refund      3          1469992         1469995         3          consumer-1-3875d283-6a7c-4691-8611-ca97b382c66a   /10.1.186.91                   consumer-1binlog.test_pay.test_refund      9          1473234         1473238         4          consumer-1-d5a49669-0c88-4b7b-b8a3-b68e379f10b6   /10.30.156.140                 consumer-1binlog.test_pay.test_refund      8          1475842         1475842         0          consumer-1-92f73b05-b5c8-4ef3-b0c0-68243bdd4469   /10.30.149.137                 consumer-1

从上述查询可知,offset 没有托管在 ZK 上,而是托管在了 __consumer_offsets 中,因此,需要修改 __consumer_offsets 中的 offset

Note:

  • consumer group offset 在 ZK 上托管时,可以参考:Manually resetting offset for a Kafka topic

在 Kafka 0.11+ 之后的版本中,提供了重置 offset 的工具:(确认:此工具也可以操作 Kafka 0.10 集群)

// 查看 consumer group 的 offset 状态./kafka-consumer-groups.sh --bootstrap-server kafka1.hdp.ningg.cn:9092 --describe --group pull-for-abacus-prod // 尝试重置  consumer group./kafka-consumer-groups.sh --bootstrap-server kafka1.hdp.ningg.cn:9092 --group pull-for-abacus-prod --reset-offsets --to-earliest --all-topics // 确认重置  consumer group./kafka-consumer-groups.sh --bootstrap-server kafka1.hdp.ningg.cn:9092 --group pull-for-abacus-prod --reset-offsets --to-earliest --all-topics --execute

Note:

  • 工具详情:https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

完整的 kafka-consumer-groups 工具的操作选项:

$ ./kafka-consumer-groups.shList all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.Option                                  Description------                                  -------------all-topics                            Consider all topics assigned to a                                          group in the `reset-offsets` process.--bootstrap-server <String: server to   REQUIRED (for consumer groups based on  connect to>                             the new consumer): The server to                                          connect to.--by-duration <String: duration>        Reset offsets to offset by duration                                          from current timestamp. Format:                                          'PnDTnHnMnS'--command-config <String: command       Property file containing configs to be  config property file>                   passed to Admin Client and Consumer.--delete                                Pass in groups to delete topic                                          partition offsets and ownership                                          information over the entire consumer                                          group. For instance --group g1 --                                          group g2                                        Pass in groups with a single topic to                                          just delete the given topic's                                          partition offsets and ownership                                          information for the given consumer                                          groups. For instance --group g1 --                                          group g2 --topic t1                                        Pass in just a topic to delete the                                          given topic's partition offsets and                                          ownership information for every                                          consumer group. For instance --topic                                          t1                                        WARNING: Group deletion only works for                                          old ZK-based consumer groups, and                                          one has to use it carefully to only                                          delete groups that are not active.--describe                              Describe consumer group and list                                          offset lag (number of messages not                                          yet processed) related to given                                          group.--execute                               Execute operation. Supported                                          operations: reset-offsets.--export                                Export operation execution to a CSV                                          file. Supported operations: reset-                                          offsets.--from-file <String: path to CSV file>  Reset offsets to values defined in CSV                                          file.--group <String: consumer group>        The consumer group we wish to act on.--list                                  List all consumer groups.--new-consumer                          Use the new consumer implementation.                                          This is the default, so this option                                          is deprecated and will be removed in                                          a future release.--reset-offsets                         Reset offsets of consumer group.                                          Supports one consumer group at the                                          time, and instances should be                                          inactive                                        Has 3 execution options: (default) to                                          plan which offsets to reset, --                                          execute to execute the reset-offsets                                          process, and --export to export the                                          results to a CSV format.                                        Has the following scenarios to choose:                                          --to-datetime, --by-period, --to-                                          earliest, --to-latest, --shift-by, --                                          from-file, --to-current. One                                          scenario must be choose                                        To define the scope use: --all-topics                                          or --topic. . One scope must be                                          choose, unless you use '--from-file'                                          scenario--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset                                          by 'n'where 'n' can be positive or                                          negative--timeout <Long: timeout (ms)>          The timeout that can be set for some                                          use cases. For example, it can be                                          used when describing the group to                                          specify the maximum amount of time                                          in milliseconds to wait before the                                          group stabilizes (when the group is                                          just created, or is going through                                          some changes). (default: 5000)--to-current                            Reset offsets to current offset.--to-datetime <String: datetime>        Reset offsets to offset from datetime.                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'--to-earliest                           Reset offsets to earliest offset.--to-latest                             Reset offsets to latest offset.--to-offset <Long: offset>              Reset offsets to a specific offset.--topic <String: topic>                 The topic whose consumer group                                          information should be deleted or                                          topic whose should be included in                                          the reset offset process. In `reset-                                          offsets` case, partitions can be                                          specified using this format: `topic1:                                          0,1,2`, where 0,1,2 are the                                          partition to be included in the                                          process. Reset-offsets also supports                                          multiple topic inputs.--zookeeper <String: urls>              REQUIRED (for consumer groups based on                                          the old consumer): The connection                                          string for the zookeeper connection                                          in the form host:port. Multiple URLS                                          can be given to allow fail-over.

使用 Kafka 0.11+ 的工具,也可以操作 Kafka 0.10:

  • https://stackoverflow.com/questions/45670937/kafka-0-11-how-to-reset-offsets
  • https://stackoverflow.com/a/45517782

3.参考资料

  • https://www.ctheu.com/2017/08/07/looking-at-kafka-s-consumers-offsets/#ingest-the-json-into-druid
  • 基于 Spring-Kafka 在消费 Kafka 中数据
  • http://kafka.apache.org/documentation/#operations
  • Manually resetting offset for a Kafka topic:(consumer group offset 在 ZK 上托管)


2021Java深入资料领取方式回复“20210112”

墙裂推荐

【深度】互联网技术人的社群,点击了解!





 读写锁:使用场景和实现方案(ZooKeeper 实现)

 开发实践:代码重构的思考

 状态机的通用设计和实现

 后台开发最佳实践

 Mybatis只写Mapper接口就可以操作数据库的背后解读


关注公众号,回复“spring”有惊喜!!!

如果资源对你有帮助的话

❤️给个在看,是最大的支持❤️

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存