查看原文
其他

案例 | Kafka 为什么会丢消息?

点击关注 👉 顶级架构师 2023-09-18
推荐关注
顶级架构师后台回复 1024 有特别礼包


作者:格格步入
来源:juejin.cn/post/7135101805179961352

上一篇:聊聊 8种 架构模式


大家好,我是顶级架构师。


引入 MQ 消息中间件最直接的目的:系统解耦以及流量控制(削峰填谷)
  • 系统解耦: 上下游系统之间的通信相互依赖,利用 MQ 消息队列可以隔离上下游环境变化带来的不稳定因素。
  • 流量控制: 超高并发场景中,引入 MQ 可以实现流量 “削峰填谷” 的作用以及服务异步处理,不至于打崩服务。
引入 MQ 同样带来其他问题:数据一致性。

在分布式系统中,如果两个节点之间存在数据同步,就会带来数据一致性的问题。消息生产端发送消息到 MQ 再到消息消费端需要保证消息不丢失。

所以在使用 MQ 消息队列时,需要考虑这 3 个问题:

  • 如何知道有消息丢失?
  • 哪些环节可能丢消息?
  • 如何确保消息不丢失?



1、如何知道有消息丢失?

如何感知消息是否丢失了?可总结如下:

  1. 他人反馈: 运营、PM 反馈消息丢失。
  2. 监控报警: 监控指定指标,即时报警人工调整。Kafka 集群异常、Broker 宕机、Broker 磁盘挂载问题、消费者异常导致消息积压等都会给用户直接感觉是消息丢失了。
案例:舆情分析中数据采集同步

  • PM 可自己下发采集调度指令,去采集特定数据。
  • PM 可通过 ES 近实时查询对应数据,若没相应数据可再次下发指令。

当感知消息丢失了,那就需要一种机制来检查消息是否丢失。

检索消息

运维工具有:

  1. 查看 Kafka 消费位置:

# 查看某个topic的message数量$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic
# 查看consumer Group列表$ ./kafka-consumer-groups.sh --list --bootstrap-server 192.168.88.108:9092
# 查看 offset 消费情况$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describeGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDconsole-consumer-1152 test_topic 0 - 4 - consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1 consumer-console-consumer-1152-1
  1. 利用工具:Kafka Tools

  1. 其他可见化界面工具



2、哪些环节可能丢消息?

一条消息从生产到消费完成经历 3 个环节:消息生产者、消息中间件、消息消费者。
哪个环节都有可能出现消息丢失问题。

1)生产端

首先要认识到 Kafka 生产端发送消息流程:

调用 send() 方法时,不会立刻把消息发送出去,而是缓存起来,选择恰当时机把缓存里的消息划分成一批数据,通过 Sender 线程按批次发送给服务端 Broker

此环节丢失消息的场景有: 即导致 Producer 消息没有发送成功

  1. 网络波动: 生产者与服务端之间的链路不可达,发送超时。现象是:各端状态正常,但消费端就是没有消费消息,就像丢失消息一样。
  • 解决措施:重试 props.put("retries", "10");
  • 不恰当配置: 发送消息无 ack 确认; 发送消息失败无回调,无日志。
    producer.send(new ProducerRecord<>(topic, messageKey, messageStr), new CallBack(){...});
    • 解决措施:设置 acks=1 或者 acks=all。发送消息设置回调。

    回顾下重要的参数:acks

    • acks=0:不需要等待服务器的确认. 这是 retries 设置无效. 响应里来自服务端的 offset 总是 -1producer只管发不管发送成功与否。延迟低,容易丢失数据。

    • acks=1:表示 leader 写入成功(但是并没有刷新到磁盘)后即向 producer 响应。延迟中等,一旦 leader 副本挂了,就会丢失数据。

    • acks=all:等待数据完成副本的复制, 等同于 -1. 假如需要保证消息不丢失, 需要使用该设置. 同时需要设置 unclean.leader.election.enable 为 true, 保证当 ISR 列表为空时, 选择其他存活的副本作为新的 leader。

    牛逼啊!接私活必备的 N 个开源项目!赶快收藏


    2)服务端

    先来了解下 Kafka Broker 写入数据的过程:

    1. Broker 接收到一批数据,会先写入内存 PageCacheOS Cache)中。

    2. 操作系统会隔段时间把 OS Cache 中数据进行刷盘,这个过程会是 「异步批量刷盘」

    这里就有个隐患,如果数据写入 PageCache 后 Kafka Broker宕机会怎样?机子宕机/掉电?

    • Kafka Broker 宕机: 消息不会丢失。因为数据已经写入 PageCache,只等待操作系统刷盘即可。
    • 机子宕机/掉电: 消息会丢失。因为数据仍在内存里,内存RAM 掉电后就会丢失数据。
      • 解决方案:使用带蓄电池后备电源的缓存 cache,防止系统断电异常。

      1. 对比学习 MySQL 的 “双1” 策略,基本不使用这个策略,因为 “双1” 会导致频繁的 I/O操作,也是最慢的一种。

      2. 对比学习 Redis 的 AOF 策略,默认且推荐的策略:Everysec(AOF_FSYNC_EVERYSEC) 每一秒钟保存一次(默认):。每个写命令执行完, 只是先把日志写到 AOF 文件的内存缓冲区, 每隔一秒把缓冲区中的内容写入磁盘。


    拓展:Kafka 日志刷盘机制

    # 推荐采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。# 针对 broker 配置:log.flush.interval.messages=10000 # 日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。log.flush.interval.ms=1000 # 日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。
    # 针对 topic 配置:flush.messages.flush.ms=1000 # topic下每1s刷盘flush.messages=1 # topic下每个消息都落盘

    # 查看 Linux 后台线程执行配置$ sysctl -a | grep dirtyvm.dirty_background_bytes = 0vm.dirty_background_ratio = 10 # 表示当脏页占总内存的的百分比超过这个值时,后台线程开始刷新脏页。vm.dirty_bytes = 0vm.dirty_expire_centisecs = 3000 # 表示脏数据多久会被刷新到磁盘上(30秒)。vm.dirty_ratio = 20vm.dirty_writeback_centisecs = 500 # 表示多久唤醒一次刷新脏页的后台线程(5秒)。vm.dirtytime_expire_seconds = 43200
    Broker 的可靠性需要依赖其多副本机制: 一般副本数 3 个(配置参数:replication.factor=3)扩展:接私活儿
    • Leader Partition 副本:提供对外读写机制。

    • Follower Partition 副本:同步 Leader 数据。

    副本之间的数据同步也可能出现问题:数据丢失问题和数据不一致问题。

    解决方案:ISR 和 Epoch 机制 (具体可看之前的文章)
    • ISR(In-Sync Replicas) : 当 Leader 宕机,可以从 ISR 中选择一个 Follower 作为 Leader

    • Epoch 机制: 解决 Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配问题。

      TipsKafka 0.11.x 版本才引入 leader epoch 机制解决高水位机制弊端。

    对应需要的配置参数如下:
    1. acks=-1 或者 acks=all 必须所有副本均同步到消息,才能表明消息发送成功。

    2. replication.factor >= 3 副本数至少有 3 个。

    3. min.insync.replicas > 1 代表消息至少写入 2个副本才算发送成功。前提需要 acks=-1

      举个栗子:Leader 宕机了,至少要保证 ISR 中有一个 Follower,这样这个Follwer被选举为Leader 且不会丢失数据。

      公式:replication.factor = min.insync.replicas + 1

    4. unclean.leader.election.enable=false 防止不在 ISR 中的 Follower被选举为 Leader

      **Kafka 0.11.0.0版本开始默认 unclean.leader.election.enable=false **


    3)消费端

    消费端消息丢失场景有:

    1. 消息堆积: 几个分区的消息都没消费,就跟丢消息一样。
    • 解决措施: 一般问题都出在消费端,尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
  • 自动提交: 消费端拉下一批数据,正在处理中自动提交了 offset,这时候消费端宕机了; 重启后,拉到新一批数据,而上一批数据却没处理完。
    • 解决措施: 取消自动提交 auto.commit = false,改为手动 ack
  • 心跳超时,引发 Rebalance 客户端心跳超时,触发 Rebalance被踢出消费组。如果只有这一个客户端,那消息就不会被消费了。
    同时避免两次 poll 的间隔时间超过阈值:
    • max.poll.records:降低该参数值,建议远远小于 <单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms> 的积。
    • max.poll.interval.ms: 该值要大于 <max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>) 的值。
    • 解决措施: 客户端版本升级至 0.10.2 以上版本。
    • 另外,搜索公众号后端架构师后台回复“架构整洁”,获取一份惊喜礼包。


    案例:凡凡曾遇到数据同步时,消息中的文本需经过 NLP 的 NER 分析,再同步到 ES

    这个过程的主要流程是:

    1. 数据同步程序从 Kafka 中拉取消息。
    2. 数据同步程序将消息内的文本发送的 NER 进行分析,得到特征数组。
    3. 数据同步程序将消息同步给 ES
    现象:线上数据同步程序运行一段时间后,消息就不消费了。
    • 排查日志: 发现有 Rebalance 日志,怀疑是客户端消费太慢被踢出了消费组。
    • 本地测试: 发现运行一段时间也会出现 Rebalance,且 NLPNER 服务访问 HTTP 500 报错。
    • 得出结论: 因NER服务异常,导致数据同步程序消费超时。且当时客户端版本为 v0.10.1Consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起,从而也会造成心跳超时。
    当时解决措施是:
    1. session.timeout.ms 设置为 25s,当时没有升级客户端版本,怕带来其他问题。
    2. 熔断机制: 增加 Hystrix,超过 3 次服务调用异常就熔断,保护客户端正常消费数据。



    3、如何确保消息不丢失?

    掌握这些技能:

    1. 熟悉消息从发送到消费的每个阶段
    2. 监控报警 Kafka 集群
    3. 熟悉方案 “MQ 可靠消息投递”

    怎么确保消息 100% 不丢失?

    到这,总结下:

    1. 生产端:

    • 设置重试:props.put("retries", "10");

    • 设置 acks=all

    • 设置回调:producer.send(msg, new CallBack(){...});

    1. Broker:

    • 内存:使用带蓄电池后备电源的缓存 cache

    • Kafka 版本 0.11.x 以上:支持 Epoch 机制。

    • replication.factor >= 3 副本数至少有 3 个。

    • min.insync.replicas > 1 代表消息至少写入 2个副本才算发送成功。前提需要 acks=-1

    • unclean.leader.election.enable=false 防止不在 ISR 中的 Follower被选举为 Leader

    1. 消费端

    • 客户端版本升级至 0.10.2 以上版本。

    • 取消自动提交 auto.commit = false,改为手动 ack

    • 尽量提高客户端的消费速度,消费逻辑另起线程进行处理。


    欢迎大家进行观点的探讨和碰撞,各抒己见。如果你有疑问,也可以找我沟通和交流。


    最后给读者整理了一份BAT大厂面试真题,需要的可扫码回复“面试题”即可获取。


    公众号后台回复 架构 或者 架构整洁 有惊喜礼包!顶级架构师交流群

     「顶级架构师」建立了读者架构师交流群,大家可以添加小编微信进行加群。欢迎有想法、乐于分享的朋友们一起交流学习。

    扫描添加好友邀你进架构师群,加我时注明姓名+公司+职位】


    版权申明:内容来源网络,版权归原作者所有。如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

    猜你还想看

    推荐一套开源通用后台管理系统(附源码)

    看看人家那 IM 即时通讯系统,那叫一个优雅(附源码)

    面试官:生成订单30分钟未支付,则自动取消,该怎么实现?

    阿里技术专家:一文教你高效画出技术架构图

    牛逼!接私活必备的 N 个系统项目!赶快收藏吧(附源码合集第 3 期)!

    Spring 最常用的 7 大类注解,史上最强整理!

    使用 Docker 高效搭建本地开发环境(详细教程)

    如何做到0.2秒复制100G文件?

    为什么基于etcd的分布式锁要比Redis锁安全

    公司架构师常常提起的DNS负载均衡是个什么鬼?

    7个点 5 种 API 网关技术整理

    面试官:有一个 List 对象集合,如何优雅地返回给前端?我懵了。。

    全网最全的 SpringBoot+Logback 日志配置教程!


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存