达观数据应对大规模消息数据的处理经验
达观数据是为企业提供大数据处理、个性化推荐系统服务的知名公司,在应对海量数据处理时,积累了大量实战经验。其中达观数据在面对大量的数据交互和消息处理时,使用了称为DPIO的设计思路进行快速、稳定、可靠的消息数据传递机制,本文分享了达观数据在应对大规模消息数据处理时所开发的通讯中间件DPIO的设计思路和处理经验。
一、数据通讯进程模型
我们在设计达观数据的消息数据处理机制时,首先充分借鉴了ZeroMQ和ProxyIO的设计思想。ZeroMQ提供了一种底层的网络通讯框架,提供了基本的RoundRobin负载均衡算法,性能优越,而ProxyIO是雅虎的网络通讯中间件,承载了雅虎内部大量计算节点间的实时消息处理。但是ZeroMQ没有实现基于节点健康状态的最快响应算法,并且ZeroMQ和ProxyIO对节点的状态管理,连接管理,负载均衡调度等也需要各应用自己来实现。
达观科技在借鉴两种设计思路的基础上,从进程模型、服务架构、线程模型、通讯协议、负载均衡、雪崩处理、连接管理、消息流程、状态监控等各方面进行了开拓,开发了DPIO(达观ProxyIO的简写,下文统称DPIO),确保系统高性能处理相关数据。
在DPIO的整个通讯框架体系中,采用集中管理、统一监控策略管理节点提供服务,节点间直接进行交互,并不依赖统一的管理节点(桂洪冠)。几种节点间通过http或者tcp协议进行消息传递、配置更新、状态跟踪等通讯行为。集群将不同应用的服务抽象成组的概念,相同应用的服务启动时加入的相同的组。每个通讯组有两种端点client和server。应用启动时通过配置决定自己是client端点还是server端点,在一个组内,每个应用只能有一个身份;不同组没要求。
· 监控节点,顾名思义即提供系统监控服务的,用来给系统管理员查看集群中节点的服务状态及负载情况,系统对监控节点并无实时性及稳定性要求,在本模型中是单点系统。
· 在上图的架构中把管理节点设计成双master结构,参考zookeeper集群管理思路,多个master通过一定算法分别服务于集群中一部分节点,相对于另外的服务节点则为备份管理节点,他们通过内部通讯同步数据,每个管理节点都有一个web服务为监控节点提供服务节点的状态数据。
· 服务节点即是下文要谈的代理服务,根据服务对象不同分为应用端代理和服务端代理。集群中的服务节点根据提供服务的不同分为多个组,每个代理启动都需要注册到相应的组中,然后提供服务。
二、 DPIO消息传递逻辑架构
DPIO服务节点内/间的通讯及消息传递模型见下图:
· clientHost和serverHost间使用socketapi进行tcp通讯,相同主机内部的多个进程间使用共享内存传递消息内容,client和clientproxy、server和serverproxy之间通过domain socket进行事件通知;在socket连接的一方收到对端的事件通知后,从共享内存中获取消息内容。
· clientproxy/serverproxy启动时绑定到host的一个端口响应应用api的连接,在连接到来时将该api对应的共享内存初始化,将偏移地址告诉给应用。clientproxy和serverproxy中分别维护了一个到应用api的连接句柄队列,并通过io复用技术监听这些连接上的读写事件。
· serverproxy在启动时通过socket绑定到服务器的一个端口,并以server身份注册到一个group监听该端口的连接事件,当事件到达时回调注册的事件处理函数响应事件。
· 在serverproxy内部通过不同的thread分别管理从本地应用建立的连接和从clientproxy建立的连接。thread的个数在启动proxy时由用户指定,默认是分别1个。每个clientproxy启动时会以client身份注册到一个group,并建立到同组的所有serverproxy的连接,clientproxy内部包含了连接的自管理能力及failover的处理(将在下面连接管理部分描述)。 DPIO实现了负载均衡,路由选择和透明代理的功能。
三、线程模型
DPIO的线程模型:
App epoll thread检测从api来的请求信息,并将请求信息转发到待处理队列中。从已处理队列中获取应答包,并将处理结果转发给api。
Io epoll thread检测从远端的proxy来的可写事件,并将请求包转发到远端的proxy。检测从远端的proxy的可读事件,并将应答包放在已处理队列中。
Monitor thread检测DPIO的工作状态请求,将DPIO的工作状态返回。并将决定Io epoll thread和app epoll thread的负载均衡(桂洪冠)。
四、通信协议
1. Api与DPIO通信协议
Ø 共享内存存储消息格式
字段 | 含义 | 长度 |
protocol len | 协议包的总长度 | 4bytes |
protocol head len | 协议头的长度 | 1byte |
Version_protocol_id | 协议的版本号和协议号 | 1byte |
Flag | 消息标志,标志路由模式,是否记录来源地址,有二级路由,所以这个字段一定要 Eg,末位表示要记录src,倒数第二位表示按roundrobin路由,倒数第3位表示按消息头路由,xxx | 1byte |
Proxy | 来源/目的 proxy | 2bytes |
Api | 来源/目的 api | 2bytes |
ApiTtl | 协议包的发送时间 | 2Bytes |
ClientTtl | 消息存活的时间,后面添加,增加路由策略,选择app_server | 2Bytes |
ClientProcessTime | 客户端处理所用时间 | 2Bytes |
ServerTtl | 消息存活的时间,后面添加,增加路由策略,选择app_client | 2Bytes |
timeout | 协议包的超时时间 | 2 byte |
Sid | 消息序列号 | 4bytes |
protocol body len | Body长度 | 4bytes |
protocol body | 消息体 | Size |
Ø 请求协议包
字段 | 含义 | 长度 |
protocol head len | 协议头的长度 | 1byte |
Version_protocol_id | 协议的版本号和协议号 | 1byte |
Flag | 消息标志,标志路由模式,是否记录来源地址,有二级路由,所以这个字段一定要 Eg,末位表示要记录src,倒数第二位表示按roundrobin路由,倒数第3位表示按消息头路由,xxx | 1byte |
ApiTtl | 协议包的发送时间 | 2bytes |
Timeout | 协议包的超时时间 | 2bytes |
Api | 来源/目的 api | 2bytes |
Sid | 消息序列号 | 4byte |
Begin_offset | 协议包的起始偏移 | 4bytes |
len | 协议包长度 | 4bytes |
Ø 响应协议包
字段 | 含义 | 长度 |
protocol head len | 协议头的长度 | 1byte |
Version_protocol_id | 协议的版本号和协议号 | 1byte |
Flag | 消息标志,标志路由模式,是否记录来源地址,有二级路由,所以这个字段一定要 Eg,末位表示要记录src,倒数第二位表示按roundrobin路由,倒数第3位表示按消息头路由,xxx | 1byte |
Result | 处理结果 | 1byte |
sid | 消息序列号 | 4bytes |
begin_offset | 协议包的起始偏移 | 4bytes |
len | 协议包长度 | 4bytes |
2. Proxy与监控中心的监控信息
Ø 请求协议包
字段 | 含义 | 长度 |
protocol len | 协议包的总长度 | 4bytes |
protocol head len | 协议头的长度 | 4bytes |
Version | 协议的版本号 | 4bytes |
protocol id | 协议的协议号 | 4bytess |
status_version | 当前状态版本 | 4bytes |
Proxy_identify_len | 该proxy标识长度 | 4bytess |
Proxy_identify | 该proxy 标识 | 4bytes |
protocol body | 消息体 | Size |
Ø 应答包
字段 | 含义 | 长度 |
protocol len | 协议包的总长度 | 4bytes |
protocol head len | 协议头的长度 | 4bytes |
Version | 协议的版本号 | 4bytes |
protocol id | 协议的协议号 | 4bytess |
protocol body len | Body长度 | 4bytes |
protocol body | 消息体 | Size |
五、负载均衡
DPIO的负载均衡基于最快响应法。
DPIO将所有的统计信息更新到监控中心,监控中心通过处理所有的节点的状态信息,统一负责负载均衡。
DPIO从监控中心获取所有连接的负载均衡策略。
每个连接知道只需知道自己的处理能力。
以上图为例,有三个proxy server处理程序。处理能力分别为50、30、20。
一次epoll过程能够同时探测多个连接的可写事件。
假设:三个proxy server的属于同一epoll thread,且三个proxy server假设都处理能力无限大。
限制:如果刚开始时待处理队列的数据包个数为100个,多次发送轮回后proxy server A≥proxy server B≥proxy server C, 每个发送的最多发送协议包数为待处理队列协议包个数 * 该连接所占权重。
六、雪崩处理
大型在线服务,特别是对于时延敏感的服务,当系统外部请求超过系统服务能力,而没有适当的过载保护措施时,当系统累计的超时请求达到一定规模,将可能导致系统缓冲区队列溢出,后端服务资源耗尽,最终像雪崩一样形成恶性循环。这时系统处理的每个请求都因为超时而无效,系统对外呈现的服务能力为0,且这种情况下不能自动恢复。
我们的解决策略是对协议包进行生命周期管理,现在协议包进出待处理队列和已处理队列时进行超时检测和超时处理(超时则丢弃)。
proxy client:
当app epoll thread将协议包放入待处理队列时,会将该协议包的发送时间、该协议包的超时时间,当前时间戳来判断该协议包是否已经超时。
当app epoll thread将协议包从已处理队列中移除时,会将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
当Io epoll thread将协议包从待处理队列中移除时,会将该协议包的发送时间、该协议包的超时时间,当前时间戳,该连接的协议包的平均处理时间移除。
当io epoll thread将协议包放入已处理队列时,会将将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
proxy server:
当App epoll thread将协议包从待处理队列中移除时,会将该协议包在客户端的处理时间、该协议包的超时时间、该协议包的proxy server接收时间戳、当前时间戳来判断该协议包是否已超时。
当app epoll thread将协议包放入已处理队列时,会将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
当io epoll thread将协议包从已处理队列中移除时,会将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
当io epoll thread将协议包放入待处理队列时,会将该协议包的发送时间、该协议包的超时时间来判断该协议包是否已超时。
七、连接管理
红黑树:
红黑树:保存所有连接的最近的读/写时间戳。
当epoll_wait时,首先从红黑树中获取oldest的时间戳,并将当前时间戳与oldest时间戳的时间差作为epoll_wait的超时时间,当连接中有可读/写事件发送时,首先从红黑树中删除该节点,当可读/写事件处理完毕后,再将节点插入到红黑树中,当处理完所有连接的可读/写事件时,再从红黑树中依次从移除时间戳小于当前时间戳的连接,并触发该连接的timeout事件。
八、消息处理流程
1. apiclient通过调用api的接口,将消息传给api。
2. api接受消息体,从共享内存中申请内存,填写消息头size(协议总长度)、Offset (协议版本号和协议号)、Headsize (协议头的总长度)、flag(路由策略),ApiTtl (协议包的发送时间)、timeout (协议包的超时时间)、sid(序列号),size(消息体长度)字段,封装成协议包,将协议包写入共享内存。
3. api通过socket发送请求给proxy。
4. app epoll thread通过检测api的可读事件,接受请求。通过解析请求内容,获取请求协议包所在的共享内存的偏移、请求协议包的长度和api连接index加入到处理队列。
5. proxy client的io epoll thread通过检测对端DPIO连接的可写事件,从发送队列中获取请求包,将api的index加入到协议包的api index字段。
6. proxy client的io epoll thread从共享内存中读取协议包,释放由请求包中所标识的内存空间。
7. proxy server的io epoll thread通过检测对端DPIO的可读事件,接受请求。
8. proxy server的io epoll thread从共享内存中申请空间,将proxy的index加入到协议包的proxy index字段。将请求内存写入到申请的空间中。
9. proxy server的io epoll thread 将协议包在共享内存的偏移和协议包的长度加入的待处理队列中。
10. app epoll thread从待处理队列中获取请求包,将协议包转发给相应的api进行处理。
11. api通过检测DPIO的可读事件,解析请求内容。
12. api通过解析请求内容,获取请求协议包在共享内存中的偏移和请求协议包的长度。从共享内存中读取请求内容,并释放相应空间。
13. api将请求协议包返回给应用层进行处理。
14. 应用层将应答包传给api。
15. Api从共享内存中申请空间,将应答包写入到共享内存中。
16. Api将应答包在共享内存中的偏移和应答包的大小写入到共享内存中。
17. App epoll thread通过检测可读事件,将应答包写入到已处理队列中。
18. proxy server的Io epoll thread通过检测对端的DPIO的可写事件,将已处理队列中获取应答包。
19. proxy server的Io epoll thread从共享内存中读取应答包。
20. Proxy client的Io epoll thread检测可读事件,读取应答包。
21. Proxy client的Io epoll thread通过解析应答包,从共享内存中申请空间,将应答包写入到申请的内存中。
22. Proxy client的Io epoll thread将应答包移入到已处理队列。
23. App epoll thread通过检测api的可写事件,将已处理队列中获取应答包。
24. App epoll thread发送应答包。
25. Api通过检测可读事件,获取应答包,通过解析应到包,获取应答包在共享内存中的偏移和应到的大小,从共享内存中读取应到包。
26. Api将应答包返回给应用端。
九、 状态监控
连接池中存在:当前可用连接个数
连接池中再分别获取每个连接的状态
每个可用连接分别维护以下信息:
连接处理的数据包个数、连接send失败次数、连接协议包的平均处理时间。
连接的连接状态(当重连失败达到一定次数时,定义为连接失败)。
连接的重连次数、连接的超时次数。
当监控线程accept到client的连接时,解析请求内容,然后调用连接池对象的statistics方法,连接池对象首先写入自己的统计信息,然后分别调用每个连接的statistics方法,每个连接分别填写自己的统计信息。
十、 全文总结
达观数据在处理大规模数据方面有多年的技术积累,DPIO是达观在处理大数据通讯时的一些经验,和感兴趣的朋友们分享。未来达观数据将不断分享更多的技术经验,与大家交流与合作。