从epoll入门到redis中的epoll
01
网络编程模板
常见的网络编程模式如下(以ipv4中tcp协议编程为例),
首先创建一个socket套接字,即用于监听的文件描述符listen_fd;
将它与具体的ip和端口号绑定;
开启监听;
使用一个循环来接受客户端的请求;
创建子进程或者线程来处理已经连接的请求。
//创建监听的文件描述符
listen_fd = socket()
//绑定ip和端口
bind(listen_fd, ip和端口)
//监听
listen(listen_fd)
//循环处理链接和读写操作
while(1) {
//主进程用来接收连接
new_client_fd = accept()
//创建子进程或线程处理,处理新的客户端的请求
}
02
epoll编程模板
这种模式的问题在于创建子进程、线程都有系统调用,每来一个新的TCP连接都需要分配一个进程或者线程,如果达到C10K,意味着一台机器要维护1万个进程/线程,应对高并发的场景存在一定的性能问题。能不能让一个进程/线程来维护多个socket呢?当然,就是I/O多路复用技术。
一个进程虽然任一时刻只能处理一个请求,但是处理每个请求的事件时,耗时控制在 1 毫秒以内,这样 1 秒内就可以处理上千个请求,把时间拉长来看,多个请求复用了一个进程,这就是多路复用,这种思想很类似一个 CPU 并发多个进程,所以也叫做时分多路复用。
我们熟悉的 select/poll/epoll 内核提供给用户态的多路复用系统调用,进程可以通过一个系统调用函数从内核中获取多个事件。
对比select/poll/epoll 的文章很多,这里不再阐述。因为epoll在性能方面相比select、poll存在很大的优势,所以我们直接来看epoll编程。
epoll相关的函数只有3个:
//创建epoll的句柄
int epoll_create(int __size)
//将普通的网络文件描述符添加到epoll描述符中
int epoll_ctl(int __epfd, int __op, int __fd, struct epoll_event *__event)
//等待网络事件
int epoll_wait(int __epfd, struct epoll_event *__events, int __maxevents, int __timeout)
epoll_create是创建一个epoll的描述符epoll_fd
epoll_ctl函数将epoll_fd ((int __epfd) 和 socket_fd (int __fd) ,添加 EPOLL_CTL_ADD (int __op) 或删除 EPOLL_CTL_DEL (int __op) 到epoll反应堆中,最后一个参数struct epoll_event *__event 是一个结构体,里边有2个参数需要设置:
①设置触发模式ev.events = EPOLLIN | EPOLLET; ,epoll的触发模式包括边缘触发和水平触发
②设置socket对应的fd:ev.data.fd = listen_fd;epoll_wait是获取触发的事件,第1个参数为epoll_fd, 第2个参数用于接收触发了事件的数组,后续处理就是遍历这个数组,第3个参数为可以处理的事件的最大值,第4个参数为等待时间,-1表示阻塞等待,0表示立即返回不等待,大于0的值为等待的时间。
来看一下epoll的编程模型:
//创建监听的文件描述符
listen_fd = socket()
//绑定ip和端口
bind(listen_fd, ip和端口)
//监听
listen(listen_fd)
//创建epoll句柄
epoll_fd = epoll_create(MAXEPOLLSIZE);
/**** 将监听的listen_fd添加到epoll中 【begin】****/
//创建 ev 变量,在epoll_ctl函数中使用
struct epoll_event ev;
//设置触发模式
ev.events = EPOLLIN | EPOLLET;
//设置fd变量
ev.data.fd = listen_fd;
//将listen_fd添加到epoll集合中
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev)
/**** 将监听的listen_fd添加到epoll中 【end】 ****/
//创建一个数组,用于接受所有触发的读写事件
struct epoll_event fired_events[MAXEPOLLSIZE];
//循环处理链接和读写操作
while(1) {
//等待有事件发生,fired_events中存储已经触发的事件,-1表示没有超时时间,返回触发的事件数量
epoll_event_nums = epoll_wait(epoll_fd, fired_events, curfds, -1);
for(j = 0; j < epoll_event_nums; j++) {
if(fired_events[j].data.fd == listen_fd) { //如果触发事件的描述符是 listen_fd)
//1.执行 accept()函数
new_client_fd = accept(listen_fd, xx, xx)
//2.将新的客户端连接fd添加到epoll集合中
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = new_client_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_client_fd, &ev)
} else { //如果是已连接的客户端触发的事件,则进行读写操作
//如果是读事件
recv(fired_events[j].data.fd, buf, xx, xx)
//如果是写事件
send()
}
}
}
完整示例:https://github.com/xychen/network_programming/tree/main/24.epoll%E5%AE%9E%E7%8E%B0%E6%9C%8D%E5%8A%A1%E7%AB%AF
03
redis-server中的epoll
了解了epoll的基础知识,我们再来看一下redis中是怎么基于epoll编程的。本文使用的redis源码是5.0.0版本。
1.封装
redis针对不同的系统,会选用不同的I/O多路复用底层库,只有在linux系统中使用epoll库。在ae.c文件中可以看到如下代码:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
通过封装,形成了4个函数:
int aeApiCreate(aeEventLoop *eventLoop);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
aeApiCreate对应的是epoll_create,aeApiAdd Event对应的是epoll_ctl(epoll_fd,EPOLL _CTL_ADD,……),aeApiDelEvent对应的是epoll_ctl(epoll_fd,EPOLL_CTL_DEL,……),aeAp iPoll对应的是epoll_wait 。
可以看到,每个函数的第一个参数都是aeEv entLoop类型的变量,它其实是一个全局变量,保存在 server.el 中(server变量是redis-server启动的时候创建的全局变量),来看一下aeEventLoop中对我们有用的内容:
typedef struct aeEventLoop {
int setsize; /* max number of file descriptors tracked */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
// ………………
} aeEventLoop;
这里边有一个aeFiredEvent *fired变量,它里边保存的就是在epoll_wait中触发的所有事件,对应的就是我们第二节中说的 fired_events 数组,通过循环遍历处理所有的网络事件。那aeFileEvent *events 里边保存的是什么呢?我们会在下边的小结中讲到。
2.绑定事件
从第二节中可以看到,不管是接受新连接还是处理已有连接,无外乎读、写两个事件,因此设计一个结构体,为每种网络事件分别绑定不同的读写函数即可。redis中就是这么做的:
不论是哪种事件,在事件触发时,我们都可以拿到这个网络事件对应的fd,由此我们可以想到一个数据结构,设计一个map,key是fd,value是一个复合结构,其中包括读事件处理函数、写事件处理函数,redis中也确实有这样一个结构体:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
rfileProc对应的就是读事件处理函数,wfileProc对应的是写事件处理函数,至于这次网络事件中调用哪个函数,通过mask来控制。
需要注意的是,redis中使用的不是真的map,而是直接使用的是一个数组。众所周知,linux系统中的fd是一个整型,而在系统中有个“最大文件句柄”的配置项,redis中创建的这个数组的大小就是“最大文件句柄”数,直接使用fd值的下标做为key。这个数据就是三-1节中说的aeEventLoop结构体中的events变量。
整个流程如下:当遍历aeEventLoop.fired数组时,通过aeEventLoop.fired.fd可以取到fd值,再通过aeEventLoop.events[fd]取到对应的aeFileEvent结构体,根据aeEventLoop.fired.mask值决定调用rfileProc函数或者wfileProc函数。
那么事件绑定的时机是什么时候呢?
首先看aeEventLoop中的events变量(即server.el)创建的时机:
// server.c文件 2040 行,initServer函数中
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
aeCreateEventLoop函数:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
//events的数量就是 server.maxclients (最大连接数的数量 + 一个常量值)
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
……
}
对于处理新连接的事件,redis在初始化函数 initServer(void)中就进行了绑定(server.c文件2129行),可以看到代码中,将AE_READABLE事件处理的函数绑定为acceptTcpHandler:
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
为什么外层是一个循环呢?因为一台机器可能有多个ip(多块网卡),redis为每一个ip创建了一个listen_fd
对于处理已连接的读事件,接受连接之后,创建新的客户端的时候绑定的:
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// ………………
}
3.代码调试
在一个窗口开启调试:
gdb redis-server
(gdb) b acceptTcpHandler
(gdb) b readQueryFromClient
(gdb) r
在另外一个窗口进行连接:
redis-cli
触发了第一个断点:
Breakpoint 1, acceptTcpHandler (el=0x7ffff6c2b0a0, fd=11, privdata=0x0, mask=1) at networking.c:727
727 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
看一下调用栈:
(gdb) bt
#0 acceptTcpHandler (el=0x7ffff6c2b0a0, fd=11, privdata=0x0, mask=1) at networking.c:727
#1 0x000000000042a4fe in aeProcessEvents (eventLoop=0x7ffff6c2b0a0, flags=11) at ae.c:443
#2 0x000000000042a6e1 in aeMain (eventLoop=0x7ffff6c2b0a0) at ae.c:501
#3 0x0000000000437239 in main (argc=1, argv=0x7fffffffe438) at server.c:4194
我们看到 el=0x7ffff6c2b0a0, fd=11 ,按照我们之前的说法,el.events[11]对应的应该是一个aeFileEvent结构体,其中rfileProc变量应该指向 acceptTcpHandler 函数,打印一下看看:
(gdb) p el.events[11]
$2 = {mask = 1, rfileProc = 0x441b51 <acceptTcpHandler>, wfileProc = 0x0, clientData = 0x0}
和我们上边分析的一样。
再来分析一下readQueryFromClient,在gdb中按c 命令,到达下一个断点:
(gdb) c
Continuing.
Breakpoint 2, readQueryFromClient (el=0x7ffff6c2b0a0, fd=12, privdata=0x7ffff6d0d740, mask=1) at networking.c:1501
1501 client *c = (client*) privdata;
看一下调用栈:
(gdb) bt
#0 readQueryFromClient (el=0x7ffff6c2b0a0, fd=12, privdata=0x7ffff6d0d740, mask=1) at networking.c:1501
#1 0x000000000042a4fe in aeProcessEvents (eventLoop=0x7ffff6c2b0a0, flags=11) at ae.c:443
#2 0x000000000042a6e1 in aeMain (eventLoop=0x7ffff6c2b0a0) at ae.c:501
#3 0x0000000000437239 in main (argc=1, argv=0x7fffffffe438) at server.c:4194
可以看到,这个新连接的fd是12,fd=12是一个新的连接,按照分析,el.events[12]对应的读处理函数应该是readQueryFromClient,打印一下:
(gdb) p el.events[12]
$4 = {mask = 1, rfileProc = 0x443987 <readQueryFromClient>, wfileProc = 0x0, clientData = 0x7ffff6d0d740}
结果和我们预想的一样。
再回到调用栈的代码看一下,main函数中调用了aeMain,看一下aeMain中做了什么:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
可以看到,外层是一个while循环(对应到第二节中我们最外层的那个循环),循环体内是aeProcessEvents函数。我们再来看一下aeProcessEvents的内容:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
//处理时间事件,略过……
//时间上调用了epoll_wait,返回触发事件的数量
numevents = aeApiPoll(eventLoop, tvp);
//循环处理已触发的事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
int invert = fe->mask & AE_BARRIER;
//如果是读事件,则调用 rfileProc 函数
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
//如果是写事件,调用 wfileProc 函数
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 其他代码省略……
}
//其他代码省略……
}
可以看到,首先调用epoll_wait的包装函数aeApiPoll,然后for循环处理已经出发的事件(对应第三节中的for循环),通过mask值来判断是读事件还是写事件,如果是读事件,则调用 rfileProc 函数,如果是写事件,调用 wfileProc 函数。
以上分析已经把redis中的整个网络事件处理串起来了,还遗留一个写事件处理,sendReplyToClient 函数,读者可以自行调试一下,看看它在什么场景用到。
04
总结
本文通过一个基本框架说明了epoll编程的基本模式,并抽离出来redis-server中相关的网络模块源码进行说明。本文的目的是通过简要模式帮大家理解epoll编程,所以redis中处理的很多很多细节我们没有提到,了解基本的框架之后,我们再去专研细节。
参考文章
《这次答应我,一举拿下 I/O 多路复用!》
https://mp.weixin.qq.com/s/Qpa0qXxuIM8jrBqDaXmVNA
《原来 8 张图,就能学废 Reactor 和 Proactor》
https://mp.weixin.qq.com/s/GRkZ1IEfTalQSkErWe1SAg
《Redis5设计与源码分析》
https://book.douban.com/subject/34804798/
我知道你“在看”哟~