每天处理千亿级日志量,Kafka是如何做到的?
之前为大家分享了不少 Kafka 原理解析类的干货,今天咱们一起来看看 360 基于 Kafka 千亿级数据量的深度实践!
消息队列选型
Kafka 在 360 商业化的现状
Kafka Client 框架
数据高可用
负载均衡
鉴权、授权与 ACL 方案
Quota 机制
跨 IDC 的数据同步
监控告警
线上问题及解决方案
消息队列选型
当时主要考虑以下几个维度:
社区活跃度
客户端支持
吞吐量
对比几个系统下来,觉得 Kafka 比较符合我们的要求。现在有一个新的开源系统 Pulsar,我觉得也可以尝试一下。
Kafka 设计上的亮点如下:
Kafka 通过 Replica 和 ISR 机制来保证数据的高可用。
Controller 主要是做集群的管理。
Coordinator 主要做业务级别的管理。
从这个角度来说 Kafka 有一个去中心化的设计思想在里面, 但 Controller 本身也是一个瓶颈,可以类比于 Hadoop 的 Namenode。
CAP 理论相信大家都有了解过,分布式系统实现要么是 CP,要么是 AP。
支持业务间独立重复消费,并且可以做回放。
生产端
Broker 端
消费端
日志有三个层次:
第一个层次 Topic
第二个层次 Partition(每个 Partition 是一个并行度)
第三个层次 Replica(Replica 表示 Partition 的副本数)
Kafka 在 360 商业化的现状
目前集群有千亿级数据量,100 多台万兆机器,单 Topic 的最大峰值 60 万 QPS,集群的峰值大概在 500 万 QPS。
我们目前的 Kafka 版本是 1.1.1,推荐大家部署 0.11 以上的版本会好一些,这个版本对协议做了很多优化,对于后续的 2.x 版本都是兼容的。
消费端分为实时,离线(ETL),监控三部分。实时有 Spark/Flink/Storm 等主流框架, 离线部分我们基于 Flink 自研了一个统一落地框架 Hamal,从 Kafka 消费一遍数据就可以落地到多个下游系统(HDFS、Hbase、Redis等),可以避免重复消费。
Kafka Client 框架
这样可以减少业务犯错的可能性,我们要确保极端的情况下比如网络或集群异常时的可用性,如果网络或集群不可用,数据会先落到本地,等恢复的时候再从本地磁盘恢复到 Kafka 中。
LogProducer,支持 at least once。
LogConsumer,支持 at least once 和 exactly once 两种语意,其中 exactly once 需要业务去实现 Rollback 接口。
我们还支持一种共享内存的策略来代替内存,使用共享内存是为了减少重启过程中日志的丢失数。
同时我们在 Worker 线程接口里面会提供接口让用户提交到 global offsetmap。
数据高可用
比如 Replica=3 的情况,确保三个副本在不同的物理 Rack 上,这样我们最多能容忍两个物理机架同时出问题而数据仍可用,我们 Rack Aware 方案是与负载均衡方案一起做掉的,具体后面会讲。
负载均衡
我们一开始想通过经典一致性 Hash 来解决,如下图:
然后我们发现经典一次性 Hash 不能满足我们的需求,比如要加一个节点 node5,只能分担节点 node2 的部分负载,不能做全局节点的负载均衡。
①新建 hash circle:通过 vnode_str(比如 hostname-v0)做一个 MD5 的 Hash,得到虚拟节点的 vnode_key,再用 ring 字典来保存虚拟节点到物理节点的映射,同时将 vnode_key 加入到 sorted_keys 的 list 中。
②在 Hash 环中分配 Replica:将(topic_name+partition_num+replica_num)作为 Key 用相同的 MD5 Hash 算法得到 replica_key。
接着二分查找该 replica_key 在 sorted_keys 中的 Position, 最后用 Ring 字典来映射到物理机 Node,至此 Replica 分配完成。
添加物理节点只需迁移很小一部分数据。
对不同配置的物理机做权重设置,可以支持异构集群的部署。
实现 Replica 的 Rack Aware,物理节点上面会有 Rack 信息,在为 Replica 分配物理节点的时候会记录已经分配的 Rack 信息。
如果有重复的情况,就会把 vnode_key 找到 Position 的位置 +1 找下一个物理节点,我们会确保三个 Replica 的物理 Rack 一定是不一样的(假如 Replica=3)。
鉴权、授权和 ACL 方案
下面介绍下我们实现的一个白名单机制来解决老集群的问题,首先将老业务加入到白名单中,让新业务通过工单流程来申请 Topics 和 Consumers 两种资源权限并加到白名单里,定期监测非法(没有走工单)Topics,Consumers 资源。
同时将这些资源都 Deny 掉,这样就收紧了 Topics 和 Consumer 读写权限的口子,同时原有业务不会有任何影响。
Quota 机制
一种是限制网络带宽。
一种是限制请求速率(限制 CPU)。
跨 IDC 的数据同步
这就造成跨 IDC 网络的极大浪费, 加上跨 IDC 的网络并不稳定,有时候会有一些异常,业务也不一定能很好处理。
这样做有两个好处:
一是屏蔽了异常对业务的影响。
二是节省了 IDC 之间的带宽(我们通过同步机制能保证这份数据只传输一份)。
我们还基于 Marathon/Mesos 对这个服务做了 Pass 化,提高了服务的 SLA。
监控告警
我们的监控警告平台如下:
基于 Jmx exporter+Promehteus+Grafana 来做图表展示,在每个 Broker 上面部署 Jmx exporter,Prometheus 会去 Pull 这些数据,最后通过 Grafana 来展示。
基于 Kafka Manager 做瞬态指标的监控。
基于 Burrow 做 Consumer lag 的监控。
基于 Wonder 来做告警,这个是 360 内部实现的一个组件,类似 Zabbix。
线上问题及解决方案
bootstrap.servers 性能瓶颈:该参数可以配置多台 Broker,这些 Broker 作为 Proxy 的角色为 Kafka Clients 提供 Lookup 服务。
Consumer 重启不消费:业务反馈消费停止,重启也不能够解决问题,后来定位发现是早于 0.11 之前版本的 Bug:
https://issues.apache.org/jira/browse/KAFKA-5413
一是升级到 Kafka 0.11+ 版本
二是将 Offset 迁移到新的 Consumer Group 来解决(规避掉有问题的 Coordinator)。
作者:严锁鹏
编辑:陶家龙、孙淑娟
出处:转载自微信公众号 DBAplus 社群(ID:dbaplus),本文根据严锁鹏老师在〖2019 DAMS 中国数据智能管理峰会〗现场演讲内容整理而成。
严锁鹏,奇虎 360 大数据架构运维专家,具有 10 年基础架构与大数据开发经验。2013 年加入 360 商业化团队,负责消息中间件开发与运维,同时涉及大数据架构、微服务架构、实时计算平台、机器学习平台、监控系统等基础设施建设,致力于为商业化团队提供稳定高效的基础服务。
精彩文章推荐:
面试问Kafka,这一篇全搞定我以为我对Kafka很了解,直到我看了这篇文章
Redis,Nginx,Netty为什么这么香?