查看原文
其他

从epoll入门到redis中的epoll

陈星宇老师 好未来技术 2023-03-15

01

网络编程模板

常见的网络编程模式如下(以ipv4中tcp协议编程为例),

  • 首先创建一个socket套接字,即用于监听的文件描述符listen_fd;

  • 将它与具体的ip和端口号绑定;

  • 开启监听;

  • 使用一个循环来接受客户端的请求;

  • 创建子进程或者线程来处理已经连接的请求。

//创建监听的文件描述符listen_fd = socket()//绑定ip和端口bind(listen_fd, ip和端口)//监听listen(listen_fd)//循环处理链接和读写操作while(1) { //主进程用来接收连接 new_client_fd = accept() //创建子进程或线程处理,处理新的客户端的请求}

02

epoll编程模板

这种模式的问题在于创建子进程、线程都有系统调用,每来一个新的TCP连接都需要分配一个进程或者线程,如果达到C10K,意味着一台机器要维护1万个进程/线程,应对高并发的场景存在一定的性能问题。能不能让一个进程/线程来维护多个socket呢?当然,就是I/O多路复用技术。

一个进程虽然任一时刻只能处理一个请求,但是处理每个请求的事件时,耗时控制在 1 毫秒以内,这样 1 秒内就可以处理上千个请求,把时间拉长来看,多个请求复用了一个进程,这就是多路复用,这种思想很类似一个 CPU 并发多个进程,所以也叫做时分多路复用。
我们熟悉的 select/poll/epoll 内核提供给用户态的多路复用系统调用,进程可以通过一个系统调用函数从内核中获取多个事件。

对比select/poll/epoll 的文章很多,这里不再阐述。因为epoll在性能方面相比select、poll存在很大的优势,所以我们直接来看epoll编程。
epoll相关的函数只有3个:

//创建epoll的句柄int epoll_create(int __size)//将普通的网络文件描述符添加到epoll描述符中int epoll_ctl(int __epfd, int __op, int __fd, struct epoll_event *__event)//等待网络事件int epoll_wait(int __epfd, struct epoll_event *__events, int __maxevents, int __timeout)
  • epoll_create是创建一个epoll的描述符epoll_fd

  • epoll_ctl函数将epoll_fd ((int __epfd) 和 socket_fd (int __fd) ,添加 EPOLL_CTL_ADD (int __op) 或删除 EPOLL_CTL_DEL (int __op) 到epoll反应堆中,最后一个参数struct epoll_event *__event 是一个结构体,里边有2个参数需要设置:
    ①设置触发模式ev.events = EPOLLIN | EPOLLET; ,epoll的触发模式包括边缘触发和水平触发
    ②设置socket对应的fd:ev.data.fd = listen_fd;

  • epoll_wait是获取触发的事件,第1个参数为epoll_fd, 第2个参数用于接收触发了事件的数组,后续处理就是遍历这个数组,第3个参数为可以处理的事件的最大值,第4个参数为等待时间,-1表示阻塞等待,0表示立即返回不等待,大于0的值为等待的时间。

来看一下epoll的编程模型:

//创建监听的文件描述符listen_fd = socket()//绑定ip和端口bind(listen_fd, ip和端口)//监听listen(listen_fd)
//创建epoll句柄epoll_fd = epoll_create(MAXEPOLLSIZE);
/**** 将监听的listen_fd添加到epoll中 【begin】****///创建 ev 变量,在epoll_ctl函数中使用struct epoll_event ev;//设置触发模式ev.events = EPOLLIN | EPOLLET;//设置fd变量ev.data.fd = listen_fd;//将listen_fd添加到epoll集合中epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev)/**** 将监听的listen_fd添加到epoll中 【end】 ****/
//创建一个数组,用于接受所有触发的读写事件struct epoll_event fired_events[MAXEPOLLSIZE];
//循环处理链接和读写操作while(1) { //等待有事件发生,fired_events中存储已经触发的事件,-1表示没有超时时间,返回触发的事件数量 epoll_event_nums = epoll_wait(epoll_fd, fired_events, curfds, -1); for(j = 0; j < epoll_event_nums; j++) { if(fired_events[j].data.fd == listen_fd) { //如果触发事件的描述符是 listen_fd) //1.执行 accept()函数 new_client_fd = accept(listen_fd, xx, xx) //2.将新的客户端连接fd添加到epoll集合中 ev.events = EPOLLIN | EPOLLET; ev.data.fd = new_client_fd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_client_fd, &ev) } else { //如果是已连接的客户端触发的事件,则进行读写操作 //如果是读事件 recv(fired_events[j].data.fd, buf, xx, xx) //如果是写事件 send() } }}

完整示例:https://github.com/xychen/network_programming/tree/main/24.epoll%E5%AE%9E%E7%8E%B0%E6%9C%8D%E5%8A%A1%E7%AB%AF

03

redis-server中的epoll

了解了epoll的基础知识,我们再来看一下redis中是怎么基于epoll编程的。本文使用的redis源码是5.0.0版本。

1.封装

redis针对不同的系统,会选用不同的I/O多路复用底层库,只有在linux系统中使用epoll库。在ae.c文件中可以看到如下代码:

#ifdef HAVE_EVPORT#include "ae_evport.c"#else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif#endif

通过封装,形成了4个函数:

int aeApiCreate(aeEventLoop *eventLoop);int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);

aeApiCreate对应的是epoll_create,aeApiAdd Event对应的是epoll_ctl(epoll_fd,EPOLL _CTL_ADD,……),aeApiDelEvent对应的是epoll_ctl(epoll_fd,EPOLL_CTL_DEL,……),aeAp iPoll对应的是epoll_wait 。

可以看到,每个函数的第一个参数都是aeEv entLoop类型的变量,它其实是一个全局变量,保存在 server.el 中(server变量是redis-server启动的时候创建的全局变量),来看一下aeEventLoop中对我们有用的内容:

typedef struct aeEventLoop { int setsize; /* max number of file descriptors tracked */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ // ………………} aeEventLoop;

这里边有一个aeFiredEvent *fired变量,它里边保存的就是在epoll_wait中触发的所有事件,对应的就是我们第二节中说的 fired_events 数组,通过循环遍历处理所有的网络事件。那aeFileEvent *events 里边保存的是什么呢?我们会在下边的小结中讲到。

2.绑定事件

从第二节中可以看到,不管是接受新连接还是处理已有连接,无外乎读、写两个事件,因此设计一个结构体,为每种网络事件分别绑定不同的读写函数即可。redis中就是这么做的:

不论是哪种事件,在事件触发时,我们都可以拿到这个网络事件对应的fd,由此我们可以想到一个数据结构,设计一个map,key是fd,value是一个复合结构,其中包括读事件处理函数、写事件处理函数,redis中也确实有这样一个结构体:

/* File event structure */typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData;} aeFileEvent;

rfileProc对应的就是读事件处理函数,wfileProc对应的是写事件处理函数,至于这次网络事件中调用哪个函数,通过mask来控制。

需要注意的是,redis中使用的不是真的map,而是直接使用的是一个数组。众所周知,linux系统中的fd是一个整型,而在系统中有个“最大文件句柄”的配置项,redis中创建的这个数组的大小就是“最大文件句柄”数,直接使用fd值的下标做为key。这个数据就是三-1节中说的aeEventLoop结构体中的events变量。

整个流程如下:当遍历aeEventLoop.fired数组时,通过aeEventLoop.fired.fd可以取到fd值,再通过aeEventLoop.events[fd]取到对应的aeFileEvent结构体,根据aeEventLoop.fired.mask值决定调用rfileProc函数或者wfileProc函数。

那么事件绑定的时机是什么时候呢?

首先看aeEventLoop中的events变量(即server.el)创建的时机:

// server.c文件 2040 行,initServer函数中server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

aeCreateEventLoop函数:

aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; //events的数量就是 server.maxclients (最大连接数的数量 + 一个常量值) eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); ……}

对于处理新连接的事件,redis在初始化函数 initServer(void)中就进行了绑定(server.c文件2129行),可以看到代码中,将AE_READABLE事件处理的函数绑定为acceptTcpHandler

for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); }}

为什么外层是一个循环呢?因为一台机器可能有多个ip(多块网卡),redis为每一个ip创建了一个listen_fd

对于处理已连接的读事件,接受连接之后,创建新的客户端的时候绑定的:

client *createClient(int fd) { client *c = zmalloc(sizeof(client));
/* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } // ………………}

3.代码调试

在一个窗口开启调试:

gdb redis-server(gdb) b acceptTcpHandler(gdb) b readQueryFromClient(gdb) r

在另外一个窗口进行连接:

redis-cli

触发了第一个断点:

Breakpoint 1, acceptTcpHandler (el=0x7ffff6c2b0a0, fd=11, privdata=0x0, mask=1) at networking.c:727727 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

看一下调用栈:

(gdb) bt#0 acceptTcpHandler (el=0x7ffff6c2b0a0, fd=11, privdata=0x0, mask=1) at networking.c:727#1 0x000000000042a4fe in aeProcessEvents (eventLoop=0x7ffff6c2b0a0, flags=11) at ae.c:443#2 0x000000000042a6e1 in aeMain (eventLoop=0x7ffff6c2b0a0) at ae.c:501#3 0x0000000000437239 in main (argc=1, argv=0x7fffffffe438) at server.c:4194

我们看到 el=0x7ffff6c2b0a0, fd=11 ,按照我们之前的说法,el.events[11]对应的应该是一个aeFileEvent结构体,其中rfileProc变量应该指向 acceptTcpHandler 函数,打印一下看看:

(gdb) p el.events[11]$2 = {mask = 1, rfileProc = 0x441b51 <acceptTcpHandler>, wfileProc = 0x0, clientData = 0x0}

和我们上边分析的一样。

再来分析一下readQueryFromClient,在gdb中按c 命令,到达下一个断点:

(gdb) cContinuing.
Breakpoint 2, readQueryFromClient (el=0x7ffff6c2b0a0, fd=12, privdata=0x7ffff6d0d740, mask=1) at networking.c:15011501 client *c = (client*) privdata;

看一下调用栈:

(gdb) bt#0 readQueryFromClient (el=0x7ffff6c2b0a0, fd=12, privdata=0x7ffff6d0d740, mask=1) at networking.c:1501#1 0x000000000042a4fe in aeProcessEvents (eventLoop=0x7ffff6c2b0a0, flags=11) at ae.c:443#2 0x000000000042a6e1 in aeMain (eventLoop=0x7ffff6c2b0a0) at ae.c:501#3 0x0000000000437239 in main (argc=1, argv=0x7fffffffe438) at server.c:4194

可以看到,这个新连接的fd是12,fd=12是一个新的连接,按照分析,el.events[12]对应的读处理函数应该是readQueryFromClient,打印一下:

(gdb) p el.events[12]$4 = {mask = 1, rfileProc = 0x443987 <readQueryFromClient>, wfileProc = 0x0, clientData = 0x7ffff6d0d740}

结果和我们预想的一样。

再回到调用栈的代码看一下,main函数中调用了aeMain,看一下aeMain中做了什么:

void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); }}

可以看到,外层是一个while循环(对应到第二节中我们最外层的那个循环),循环体内是aeProcessEvents函数。我们再来看一下aeProcessEvents的内容:

int aeProcessEvents(aeEventLoop *eventLoop, int flags){ int processed = 0, numevents;
//处理时间事件,略过…… //时间上调用了epoll_wait,返回触发事件的数量 numevents = aeApiPoll(eventLoop, tvp); //循环处理已触发的事件 for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ int invert = fe->mask & AE_BARRIER;
//如果是读事件,则调用 rfileProc 函数 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; }
//如果是写事件,调用 wfileProc 函数 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } // 其他代码省略…… } //其他代码省略……}

可以看到,首先调用epoll_wait的包装函数aeApiPoll,然后for循环处理已经出发的事件(对应第三节中的for循环),通过mask值来判断是读事件还是写事件,如果是读事件,则调用 rfileProc 函数,如果是写事件,调用 wfileProc 函数。

以上分析已经把redis中的整个网络事件处理串起来了,还遗留一个写事件处理,sendReplyToClient 函数,读者可以自行调试一下,看看它在什么场景用到。

04

总结

本文通过一个基本框架说明了epoll编程的基本模式,并抽离出来redis-server中相关的网络模块源码进行说明。本文的目的是通过简要模式帮大家理解epoll编程,所以redis中处理的很多很多细节我们没有提到,了解基本的框架之后,我们再去专研细节。


参考文章

《这次答应我,一举拿下 I/O 多路复用!》

https://mp.weixin.qq.com/s/Qpa0qXxuIM8jrBqDaXmVNA

《原来 8 张图,就能学废 Reactor 和 Proactor》

https://mp.weixin.qq.com/s/GRkZ1IEfTalQSkErWe1SAg

《Redis5设计与源码分析》

https://book.douban.com/subject/34804798/



扫描下方二维码添加「好未来技术」微信官方账号
进入好未来技术官方交流群与作者实时互动~
(若扫码无效,可通过微信号TAL-111111直接添加)
- 也许你还想看 -
【未来云-业务监控】实时大屏技术解决方案
直播消息服务架构最佳实践分享
学而思一对一企业微信应用实战踩坑之路

我知道你“在看”哟~


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存