数据挖掘系列篇之大众点评的实时计算
作者:面包君 【数据分析联盟创始人,前支付宝资深数据人,VC投资人,《数据分析侠的成长故事》作者,7年大数据行业数据分析和产品从业经验】。
知乎专栏:https://zhuanlan.zhihu.com/dataman
精彩回顾:数据挖掘系列篇之今日头条的个性化推荐
实时计算是目前在推荐、搜索广告等场景中经常需要应用的地方,它不像离线计算那样可以有长时间来准备数据,做数据处理。在实际的应用场景,要考虑到用户的感受。比如我在城西银泰搜索附近的商家,这个就需要实时计算距离来支持。还有像双十一这样的推荐场景,营销活动时间也有一天,必须做到实时计算来查看到活动的效果,并及时来调整营销方案。阿里、百度在实时计算平台做了很多应用。今天看到一篇王新春介绍的大众点评的实时计算storm的应用,所以来分享下。
实时计算在点评的使用场景
类别一:Dashboard、实时DAU、新激活用户数、实时交易额等
♦ Dashboard类:北斗(报表平台)、微信(公众号)和云图(流量分析)等
♦ 实时DAU:包括主APP(Android/iPhone/iPad)、团APP、周边快查、PC、M站
♦ 新激活用户数:主APP
♦ 实时交易额:闪惠/团购交易额
以报表平台为例,下图是一张APP UV的实时曲线图,它以分钟级别粒度展现了 实时的DAU数据和曲线。
从图中可以看见一个尖点,这个尖点就是当天push过后带来的用户,这样可以看到实时的运营效率。
类别二:搜索、推荐、安全等
以搜索为例:用户在点评的每一步有价值的操作(包括:搜索、点击、浏览、购买、收藏等),都将实时、智能地影响搜索结果排序,从而显著提升用户搜索体验、搜索转化率。
某用户 搜索“ 火锅 ”,当他 在搜索结果页 点击了“ 重庆高老九火锅 ”后, 再次刷新搜索结果列表时,该商户的排序就会提升到顶部 。
再结合其他的一些实时反馈的个性化推荐策略,最终使团购的交易额有了明显的增加,转化率提升了2个多点。
插图2
实时计算在业界的使用场景
场景1:阿里JStorm
♦ 双11实时交易数据
场景2:360 Storm
♦ 抢票软件验证码自动识别:大家用360浏览器在12306上买票的时候,验证码自动识别是在Storm上计算完成的。
♦ 网盘图片缩略图生成:360网盘的缩略图也是实时生成出来的,这样可以节约大量的文件数量和存储空间。
♦ 实时入侵检测
♦ 搜索热词推荐
场景3:腾讯TDProcess
分布式K/V存储引擎TDEngine和支持数据流计算的TDProcess,TDProcess是基于Storm的计算引擎,提供了通用的计算模型,如Sum、Count、PV/UV计算和TopK统计等。
场景4:京东Samza
整个业务主要应用订单处理,实时分析统计出待定区域中订单各个状态的量:待定位、待派工、待拣货、待发货、待配送、待妥投等。
点评如何构建实时计算平台
点评的实时计算平台是一个端到端的方案,从下面的平台 架构图,可以看出整体架构是一个比较长的过程,包括了数据源、数据的传输通道、计算、存储和对外服务等。
插图3
实时计算平台首先解决的问题是,数据怎么获取,如何拿到那些数据。
我们现在做到了几乎所有点评线上产生的数据都可以毫秒级拿到,封装对应的数据输入源Spout。 通过Blackhole支持日志类实时获取,包括打点日志、业务Log、Nginx日志等。 整合Puma Client第一时间获取数据库数据变更。 整合Swallow获取应用消息。
Blackhole是我们团队开发的类Kafka系统,主要目标是批量从业务方拉取日志时做到数据的完整性和一致性,然后也提供了实时的消费能力。 Puma是以MySQL binlog为基础开发的,这样可以实时拿到数据库的update、delete、insert操作。 Swallow是点评的MQ系统 。
通过整合各种传输通道,并且封装相应的Spout,做业务开发的同学就完全不用关心数据怎样可靠获取,只需要写自己的业务逻辑就可以了。
解决了数据和传输问题后,计算过程则在Storm中完成。 如果在Storm计算过程中或计算出结果后,需要与外部存储系统交互,我们也提供了一个data-service服务 ,通过点评的RPC框架提供接口, 用户不用关心实际Redis/HBase这些系统的细节和部署情况, 以及这个数据到底是在Redis还是HBase中的,我们可以根据SLA来做自动切换; 同时计算的结果也是通过data-service服务,再反馈到线上系统。
就拿刚刚搜索结果的例子,搜索业务在用户再次搜索的时候会根据userId请求一次data-service,然后拿到这个用户的最近浏览记录,并重新排序结果,返回给用户。 这样的好处就是实时计算业务和线上其他业务完全解耦,实时计算这边出现问题,不会导致线上业务出现问题。
Storm基础知识简单介绍
Apache Storm( Apache Storm )是由Twitter开源的分布式实时计算系统。Storm可以非常容易、可靠地处理无限的数据流。对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。Storm可以使用何编程语言进行开发。
Storm的集群表面上看和Hadoop的集群非常像,但是在Hadoop上面运行的是MapReduce的Job,而在Storm上面运行的是Topology。
Storm和Hadoop一个非常关键的区别是Hadoop的MapReduce Job最终会结束, 而Storm的Topology会一直运行(除非显式地杀掉)。
Storm基本概念:
插图4
Nimbus和Supervisor之间的通讯是依靠ZooKeeper来完成,并且Nimbus进程和Supervisor都是快速失败(fail-fast)和无状态的。可以用kill -9来杀死Nimbus和Supervisor进程,然后再重启它们,它们可以继续工作。
在Storm中,Spout 是Topology中产生源数据流的组件。通常Spout获取从Kafka、MQ等的数据,然后调用nextTuple函数,发射数据出去供Bolt消费。
插图5
图中的Spout就发射出去了两条数据流。
而Bolt是在Topology中接受Spout的数据,然后执行处理的组件。Bolt在接收到消息后会调用execute函数,用户可以在其中执行自己想要的操作。
插图6
为什么用Storm呢,因为Storm有它的优点:
易用性
只要遵守Topology,Spout, Bolt的编程规范即可开发出一个扩展性极好的应用,像底层RPC,Worker之间冗余,数据分流之类的操作,开发者完全不用考虑。
扩展性
当某一级处理单元速度不够时,直接配置一下并发数,即可线性扩展性能。
健壮性
当Worker失效或机器出现故障时, 自动分配新的Worker替换失效Worker。
准确性
采用Acker机制,保证数据不丢失。采用事务机制,保证数据准确性。
刚刚介绍了一些Storm的基础概念和特性,再用一张比较完整的图来回顾一下整个Storm的体系架构:
插图7
Storm提交一个作业的时候,是通过Thrift的Client执行相应的命令来完成。
Nimbus针对该Topology建立本地的目录,Nimbus中的调度器根据Topology的配置计算Task,并把Task分配到不同的Worker上,调度的结果写入Zookeeper中。
Zookeeper上建立assignments节点,存储Task和Supervisor中Worker的对应关系。
在Zookeeper上创建workerbeats节点来监控Worker的心跳。
Supervisor去Zookeeper上获取分配的Tasks信息,启动一个或者多个Worker来执行。
每个Worker上运行多个Task,Task由Executor来具体执行。Worker根据Topology信息初始化建立Task之间的连接,相同Worker内的Task通过DisrupterQueue来通信,不同Worker间默认采用Netty来通信,然后整个Topology就运行起来了。
如何保证业务运行可靠性
首先Storm自身有很多容错机制,我们也加了很多监控信息,方便业务同学监控自己的业务状态。 在Storm上,遇到的一个很基本的问题就是,各个业务是运行的Worker会跑在同一台物理机上。曾经有位同学就在自己的Worker中起了200多个线程来处理json,结果就是这台机器的CPU都被他的Worker吃光了,其他的业务也跟着倒霉。
因此我们也使用CGroup做了每个Worker的资源隔离, 主要限制了CPU和Memory的使用。相对而言JStorm在很多方面要完善一些,JStorm自己就带资源隔离。 对应监控来说,基本的主机维度的监控在ganglia上可以看见,比如现在集群的运行状况。
下图是现在此时的集群的网络和负载:
插图8
这些信息并不能保证业务就OK,因此我们将Storm上的很多监控信息和点评的开源监控系统Cat集成在了一起,从Cat上可以看见更多的业务运行状态信息。
比如在Cat中我可以看见整个集群的TPS,现在已经从30多万降下来了。 然后我可以设置若干的报警规则, 如:连续N分钟降低了50%可以报警。 然后也监控了各个业务Topology的TPS、Spout输入、Storm的可用Slot等的变化。
这个图就是某个业务的TPS信息, 如果TPS同比或者环比出现问题,也可以报警给业务方。
Storm使用经验分享
1.使用组件的并行度代替线程池
Storm自身是一个分布式、多线程的框架,对每个Spout和Bolt,我们都可以设置其并发度;它也支持通过rebalance命令来动态调整并发度,把负载分摊到多个Worker上。
如果自己在组件内部采用线程池做一些计算密集型的任务,比如JSON解析,有可能使得某些组件的资源消耗特别高,其他组件又很低,导致Worker之间资源消耗不均衡,这种情况在组件并行度比较低的时候更明显。
比如某个Bolt设置了1个并行度,但在Bolt中又启动了线程池,这样导致的一种后果就是,集群中分配了这个Bolt的Worker进程可能会把机器的资源都给消耗光了,影响到其他Topology在这台机器上的任务的运行。如果真有计算密集型的任务,我们可以把组件的并发度设大,Worker的数量也相应提高,让计算分配到多个节点上。
为了避免某个Topology的某些组件把整个机器的资源都消耗光的情况,除了不在组件内部启动线程池来做计算以外,也可以通过CGroup控制每个Worker的资源使用量。
2.不要用DRPC批量处理大数据
RPC提供了应用程序和Storm Topology之间交互的接口,可供其他应用直接调用,使用Storm的并发性来处理数据,然后将结果返回给调用的客户端。这种方式在数据量不大的情况下,通常不会有问题,而当需要处理批量大数据的时候,问题就比较明显了。
(1)处理数据的Topology在超时之前可能无法返回计算的结果。
(2)批量处理数据,可能使得集群的负载短暂偏高,处理完毕后,又降低回来,负载均衡性差。
批量处理大数据不是Storm设计的初衷,Storm考虑的 是时效性和批量之间的均衡,更多地看中前者。需要准实时地处理大数据量,可以考虑Spark Stream等批量框架。
3.不要在Spout中处理耗时的操作
Spout中nextTuple方法会发射数据流,在启用Ack的情况下,fail方法和ack方法会被触发。
需要明确一点,在Storm中Spout是单线程(JStorm的Spout分了3个线程,分别执行nextTuple方法、fail方法和ack方法)。如果nextTuple方法非常耗时,某个消息被成功执行完毕后,Acker会给Spout发送消息,Spout若无法及时消费,可能造成ACK消息超时后被丢弃,然后Spout反而认为这个消息执行失败了,造成逻辑错误。反之若fail方法或者ack方法的操作耗时较多,则会影响Spout发射数据的量,造成Topology吞吐量降低。
4.注意fieldsGrouping的数据均衡性
fieldsGrouping是根据一个或者多个Field对数据进行分组,不同的目标Task收到不同的数据,而同一个Task收到的数据会相同。
假设某个Bolt根据用户ID对数据进行fieldsGrouping,如果某一些用户的数据特别多,而另外一些用户的数据又比较少,那么就可能使得下一级处理Bolt收到的数据不均衡,整个处理的性能就会受制于某些数据量大的节点。可以加入更多的分组条件或者更换分组策略,使得数据具有均衡性。
5.优先使用localOrShuffleGrouping
localOrShuffleGrouping是指如果目标Bolt中的一个或者多个Task和当前产生数据的Task在同一个Worker进程里面,那么就走内部的线程间通信,将Tuple直接发给在当前Worker进程的目的Task。否则,同shuffleGrouping。
localOrShuffleGrouping的数据传输性能优于shuffleGrouping,因为在Worker内部传输,只需要通过Disruptor队列就可以完成,没有网络开销和序列化开销。因此在数据处理的复杂度不高,而网络开销和序列化开销占主要地位的情况下,可以优先使用localOrShuffleGrouping来代替shuffleGrouping。
6.设置合理的MaxSpoutPending值
在启用Ack的情况下,Spout中有个RotatingMap用来保存Spout已经发送出去,但还没有等到Ack结果的消息。RotatingMap的最大个数是有限制的,为p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也可以由TopologyBuilder在setSpout通过setMaxSpoutPending方法来设定),num-tasks是Spout的Task数。如果不设置MaxSpoutPending的大小或者设置得太大,可能消耗掉过多的内存导致内存溢出,设置太小则会影响Spout发射Tuple的速度。
7.设置合理的Worker数
Worker数越多,性能越好?先看一张Worker数量和吞吐量对比的曲线(来源于JStorm文档: jstorm/docs at master · alibaba/jstorm · GitHub 0.9.4.1jstorm性能测试.docx)。
从图可以看出,在12个Worker的情况下,吞吐量最大,整体性能最优。这是由于一方面,每新增加一个Worker进程,都会将一些原本线程间的内存通信变为进程间的网络通信,这些进程间的网络通信还需要进行序列化与反序列化操作,这些降低了吞吐率。
另一方面,每新增加一个Worker进程,都会额外地增加多个线程(Netty发送和接收线程、心跳线程、SystemBolt线程以及其他系统组件对应的线程等),这些线程切换消耗了不少CPU,sys 系统CPU消耗占比增加,在CPU总使用率受限的情况下,降低了业务线程的使用效率。
8.平衡吞吐量和时效性
Storm的数据传输默认使用Netty。在数据传输性能方面,有如下的参数可以调整:
storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分别为接收消息线程和发送消息线程的数量。
netty.transfer.batch.size是指每次 Netty Client向 Netty Server发送的数据的大小,如果需要发送的Tuple消息大于netty.transfer.batch.size,则Tuple消息会按照netty.transfer.batch.size进行切分,然后多次发送。
storm.messaging.netty.buffer_size为每次批量发送的Tuple序列化之后的TaskMessage消息的大小。
storm.messaging.netty.flush.check.interval.ms表示当有TaskMessage需要发送的时候, Netty Client检查可以发送数据的频率。
降低storm.messaging.netty.flush.check.interval.ms的值,可以提高时效性。增加netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,可以提升网络传输的吐吞量,使得网络的有效载荷提升(减少TCP包的数量,并且TCP包中的有效数据量增加),通常时效性就会降低一些。因此需要根据自身的业务情况,合理在吞吐量和时效性直接的平衡。
除了这些参数,我们怎么找到Storm中性能的瓶颈,可以通过如下的一些途径来进行:
在Storm的UI中,对每个Topology都提供了相应的统计信息,其中有3个参数对性能来说参考意义比较明显,包括Execute latency、Process latency和Capacity。
分别看一下这3个参数的含义和作用。
(1)Execute latency:消息的平均处理时间,单位为毫秒。
(2)Process latency:消息从收到到被ack掉所花的时间,单位为毫秒。如果没有启用Acker机制,那么Process latency的值为0。
(3)Capacity:计算公式为Capacity = Bolt或者Executor调用execute方法处理的消息数量 * 消息平均执行时间 / 时间区间。这个值越接近1,说明Bolt或者Executor基本一直在调用execute方法,因此并行度不够,需要扩展这个组件的Executor数量。
为了在Storm中达到高性能,我们在设计和开发Topology的时候,需要注意以下原则:
(1)模块和模块之间解耦,模块之间的层次清晰,每个模块可以独立扩展,并且符合流水线的原则。
(2)无状态设计,无锁设计,水平扩展支持。
(3)为了达到高的吞吐量,延迟会加大;为了低延迟,吞吐量可能降低,需要在二者之间平衡。
(4)性能的瓶颈永远在热点,解决热点问题。
(5)优化的前提是测量,而不是主观臆测。收集相关数据,再动手,事半功倍。
关于计算框架的后续想法
目前Hadoop/Hive专注于离线分析业务,每天点评有1.6万个离线分析任务。Storm专注于实时业务,实时每天会处理100亿+条的数据。
在这两个框架目前有很大的gap,一个是天级别,一个是秒级别,然后有大量的业务是准实时的,比如分钟级别。因此我们会使用Spark来做中间的补充。
Spark Streaming + Spark SQL也能够降低很大的开发难度。相对而言,目前Storm的学习和开发成本还是偏高。要做一个10万+ TPS的业务在Storm上稳定运行,需要对Storm了解比较深入才能做到,不然会发现有这样或者那样的问题。
后面,我们计划在的大数据开发者平台上,统一实时计算/准实时计算和离线计算任务的管理和监控。
【Q&A】
【问题1】 Blackhole和Swallow专注点区别是什么?
Blackhole主要专注于日志类型的业务,就像Kafka一样,日志类型的对可靠性和一致性要求不会那么高,但是需要支持非常大的QPS,比如几十万到几百万。
【问题2】日志格式是统一定义的吧?能分享一下日志格式吗?
日志格式是统一的,我们提供了一个基于log4j的日志框架,里面定义好了KV的分隔符。业务把日志输出到文件,然后通过Blackhole把日志文件读取,然后在Spout中完成解析,在Blot中就是具体的日志的KV对了,业务就自己去使用。至于格式,很简单,只要定义好每个KV对的分隔符,然后K和V的分隔符就可以了。
【问题3】S专注在业务上?考虑过事务么,会不会有重复处理造成数据异常的问题?
对于这个问题,首先我们在实际业务中还没有使用事务。在没有启用事务的情况下,需要考虑业务的幂等的问题。如果业务可以幂等,那么重复数据不会有任何问题。因为像Kafka等系统,保证的是at leaset once,数据源就会有重复数据出现。然后启用事务会对性能也有比较大的影响,这个就自己权衡了
【问题4】APP Client端的数据采集,是否有延迟的问题?
如果是打点数据有延迟,如果一直访问,延迟很小,1s以内;如果只浏览几次,那么的确可能延迟比较大。client端是以batch发上来,为了省流量。因此有些数据就通过从数据库那边拖来,比如用户收藏了商户,打点和数据库都可以拿到,那么就从数据库拿
【问题5】 系统中的MQ也是用kafka吗?点评的量级,Kafka的集群数大概是多少?
MQ不是Kafka,是点评基于ActiveMQ修改的,然后消息持久化是在mongodb中。我们用了7台broker支撑了每天2T+的流量
【问题6】根据用户行为排序,这个会不会影响搜索的性能,是如何解决的?
点评推荐系统就是根据用户id去redis获取实时信息,作为score的一个feature。 对搜索影响不大的。作为推荐第一个使用实时数据,效果提升很明显的。
【问题7】实时计算这里是多大级别的服务器集群呢?
目前,只用1台Nimbus + 9台Supervisor支撑了了20多个业务,峰值的时候大概可以跑到40万TPS。
【问题8】日志采用写文件方式,是不是对磁盘io负载高?并发能达到多少?blackhole拉取 这个不能实时吧?
写文件是写Page Cache的,因此不会高,可以参考Kafka的文档。blackhole拉取现在是监听了文件的变更,因此毫秒内可以知道。
【问题9】 请问点评Storm集群中,共享spout的多个业务的topology划分粒度是怎样的?
是这样的,比如流量类型的,后面很多业务会用到流量数据,IP维度的统计,GUID维度的统计,PV统计等,这类会在一个Topology中,因为后续业务只需要使用这个Topology的输出就可以了。而且流量数据很大,每个业务自己处理,那的确浪费很严重。因此这个是共享的,我们也保证他的可用性。其他业务目前我们没有共享的情况
【问题10】你们的数据抽取会对业务系统有性能影响,而且你们可以做到毫秒级,你们如何降低或消除这些性能影响的?
目前所有的抽取都是旁路的,不是业务的主流程上,因此不会有多大影响。比如业务输出日志,发送MQ消息等。
【问题11】在最开始的时候您说点评开发了自己的RPC框架。为什么点评要自己开发而不用现有的开源框架呢?
自己开发时候开源还很少。而且不成熟。
【问题12】对于某些数据的采集,是否有采样策略,如APP Client端的数据采集,还是全量采集?
目前打点数据是全量的,PV MV等都是全量过来的,通过长链,小批量+压缩过来。有一些特殊性的,量又不大的,会走实时发送的通路
【问题13】 除了上面讲到的业务点,点评目前还在哪些业务线用到storm计算实时数据?
安全,反作弊,推荐,广告等都用。
【问题14】各个业务的Spout数据接口是如何定义的。怎么与业务开发人员交互?
比如日志类型的Spout,业务需要知道订阅那个数据源就可以,其他不管。输出就是KV对,然后我们有个地方可以去查,这些日志格式是什么含义。
【问题15】 听过一次腾讯的分享,他们对于storm的使用做了sql接口,点评在做这样的尝试么,有没有可以分享的sql解析工具?
目前没有使用SQL接口,可以参考esper。
【问题16】Storm使用的那个版本,对JVM做了那些优化,有没有遇到当cpu90%以上时,出现worker宕掉,然而发生连锁反应work全挂?
线上版本是0.9.3,0.9.3有几个bug比较讨厌,然后考虑升级0.9.4,同时修改netty server的接收代码逻辑,在上游数据处理快,下游来不及处理的时候,并且不开ACK情况下目前会导致下游OOM。
cpu 90%没有遇到worker down的情况,比如今天某个高峰worker就跑到了500%。
【路人补充】这个是0.9.3netty client的一个bug,在0.9.4给修正了,当worker在不同supervisor上迁移时,就可能出现这个问题。
【问题17】在Storm中有没有应用Esper?
目前没有。
【问题18】介绍里说实时计算用Storm,分钟级别计算用Spark;是否一定要严格这么划分,有无其他评判标准?比如数据量等
目前没有严格规定,主要是看你对实时性和可靠性的要求。Spark目前在7*24小时的次序运行我们觉得稳定性还差一点。然后Storm的实时性会高一些,Spark略差一些,但是Spark开发成本低,因此业务自己来选。
【问题19】Storm业务配置变更是怎么实现动态更新的?
这个目前配置项都是放在点评基于zk的lion上来完成的,因此可以反推。
【问题20】storm的计算结果存储都采用的什么介质?
目前我们是用Redis为主,HBase和MySQL为辅,然后部分结果发到MQ。
好了,本次精彩的分享就到这里,后续会继续上大众点评的推荐算法和过滤机制。
内容:1、互联网金融的发展历程 2、大数据在互联网金融的应用 3、 征信体系介绍 4、风控反作弊欺诈模型运用 5、互联网金融公司贷款授信 6、保险定价策略分析 7、量化投资应用
参加方式:阅读原文或扫码参加