为何要将 listenfd 设置成非阻塞的?
阅读本文之前您可能需要知道 one thread one loop 思想,如果您还不熟悉,可以参考这篇《one thread one loop 思想》。
为了行文方便,以下将侦听 socket 称之为 listenfd,将由调用 accept 函数返回的 socket 称之为 clientfd。
我们知道如果需要使用 IO 复用函数统一管理各个 fd,需要将 clientfd 设置成非阻塞的,那么 listenfd 一定要设置成非阻塞的吗?答案是不一定的——只要不用 IO 复用函数去管理 listenfd 就可以了,listenfd 如果不设置成非阻塞的,那么 accept 函数在没有新连接时就会阻塞。
1. 结构一 listenfd 设置为阻塞模式,为了 listenfd 独立分配一个接受连接线程
有很多的服务器程序结构确实采用的就是阻塞的 listenfd,为了不让 accept 函数在没有连接时阻塞对程序其他逻辑执行流造成影响,我们通常将 accept 函数放在一个独立的线程中,这个线程的伪码如下:
1//接受连接线程
2void* accept_thread_func(void* param)
3{
4 //可以在这里做一些初始化工作...
5
6 while (退出标志)
7 {
8 struct sockaddr_in clientaddr;
9 socklen_t clientaddrlen = sizeof(clientaddr);
10 //没有连接时,线程会阻塞在accept函数处
11 int clientfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientaddrlen);
12 if (clientfd != -1)
13 {
14 //出错了,可以在此做一些清理资源动作,如关闭listenfd
15 break;
16 }
17
18 //将clientfd交给其他IO线程的IO复用函数
19 //由于跨线程操作,可以需要一些锁对公共操作的资源进行保护
20 }
21}
其他 IO 线程的结构还是利用 IO 复用函数处理 clientfd 的 one thread one loop 结构,这里以 epoll_wait 为例,即:
1//其他IO线程
2void* io_thread_func(void* param)
3{
4 //可以在这里做一些初始化工作
5
6 while (退出标志)
7 {
8 epoll_event epoll_events[1024];
9 //所有的clientfd都挂载到epollfd由epoll_wait统一检测读写事件
10 n = epoll_wait(epollfd, epoll_events, 1024, 1000);
11
12 //epoll_wait返回时处理对应clientfd上的读写事件
13
14 //其他一些操作
15 }
16}
当然,这里的 IO 线程可以存在多个,这种结构示意图如下:
将 clientfd 从 accept_thread_func 交给 io_thread_func方法也很多,这里以使用一个互斥锁来实现为例:
1//存储accept函数产生的clientfd的多线程共享变量
2std::vector<int> g_vecClientfds;
3//保护g_vecClientfds的互斥体
4std::mutex g_clientfdMutex;
5
6//接受连接线程
7void* accept_thread_func(void* param)
8{
9 //可以在这里做一些初始化工作...
10
11 while (退出标志)
12 {
13 struct sockaddr_in clientaddr;
14 socklen_t clientaddrlen = sizeof(clientaddr);
15 //没有连接时,线程会阻塞在accept函数处
16 int clientfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientaddrlen);
17 if (clientfd != -1)
18 {
19 //出错了,可以在此做一些清理资源动作,如关闭listenfd
20 break;
21 }
22
23 //将clientfd交给其他IO线程的IO复用函数
24 //由于跨线程操作,可以需要一些锁对公共操作的资源进行保护
25 std::lock_guard<std::mutex> scopedLock(g_clientfdMutex);
26 g_vecClientfds.push_back(clientfd);
27 }
28}
29
30//其他IO线程
31void* io_thread_func(void* param)
32{
33 //可以在这里做一些初始化工作
34
35 while (退出标志)
36 {
37 epoll_event epoll_events[1024];
38 //所有的clientfd都挂载到epollfd由epoll_wait统一检测读写事件
39 n = epoll_wait(epollfd, epoll_events, 1024, 1000);
40
41 //epoll_wait返回时处理对应clientfd上的读写事件
42
43 //其他一些操作
44
45 //从共享变量g_vecClientfds取出新的clientfd
46 retrieveNewClientfds(epollfd);
47 }
48}
49
50void retrieveNewClientfds(int epollfd)
51{
52 std::lock_guard<std::mutex> scopedLock(g_clientfdMutex);
53 if (!g_vecClientfds.empty())
54 {
55 //遍历g_vecClientfds取出各个fd,然后将fd设置挂载到所在线程的epollfd上
56
57 //全部取出后,清空g_vecClientfds
58 g_vecClientfds.clear();
59 }
60}
注意上述代码中,由于要求 clientfd 是非阻塞的,设置 clientfd 为非阻塞的这段逻辑你可以放在 accept_thread_func 或 io_thread_func 中均可。
上述代码有点效率问题,某个时刻 accept_thread_func 往 g_vecClientfds 添加了一个 clientfd,但此时如果 io_thread_func 函数正阻塞在 epoll_wait 处,所以此时我们要唤醒 epoll_wait,我们已经在《one thread one loop 思想》中介绍了如何设计这个唤醒逻辑,这里就不再赘述了。
2. 结构二 listenfd 为阻塞模式,使用同一个 one thread one loop 结构去处理 listenfd 的事件
单独为 listenfd 分配一个线程毕竟是对资源的一种浪费,有读者可能说,listenfd 虽然设置成了阻塞模式,但我可以将 listenfd 挂载在到某个 loop 的 epollfd 上,当 epoll_wait 返回且 listenfd 上有读事件时调用 accept 函数时,此时 accept 就不会阻塞了。伪码如下:
1void* io_thread_func(void* param)
2{
3 //可以在这里做一些初始化工作
4
5 while (退出标志)
6 {
7 epoll_event epoll_events[1024];
8 //listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件
9 n = epoll_wait(epollfd, epoll_events, 1024, 1000);
10
11 if (listenfd上有事件)
12 {
13 //此时调用accept函数不会阻塞
14 int clientfd = accept(listenfd, ...);
15
16 //对clientfd作进一步处理
17 }
18
19 //其他一些操作
20 }
21}
如上述代码所示,这种情况下确实可以将 listenfd 设置成阻塞模式,调用 accept 函数也不会造成流程阻塞。
但是,问题是这样的设计存在严重的效率问题:这种设计在每一轮循环中只能一次接受一个连接(每次循环仅调用了一次 accept),如果连接数较多,这种处理速度可能跟不上,所以要在一个循环里面处理 accept,但是实际情形是我们没法确定下一轮调用 accept 时 backlog 队列中是否还有新连接呀,如果没有,由于 listenfd 是阻塞模式的, accept 会阻塞。
3. 结构三 listenfd 为非阻塞模式,使用同一个 one thread one loop 结构去处理 listenfd 的事件
当将 listenfd 设置成非阻塞模式,我们就不会存在这种窘境了。伪码如下:
1void* io_thread_func(void* param)
2{
3 //可以在这里做一些初始化工作
4
5 while (退出标志)
6 {
7 epoll_event epoll_events[1024];
8 //listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件
9 n = epoll_wait(epollfd, epoll_events, 1024, 1000);
10
11 if (listenfd上有事件)
12 {
13 while (true)
14 {
15 //此时调用accept函数不会阻塞
16 int clientfd = accept(listenfd, ...);
17 if (clientfd == -1)
18 {
19 //错误码是EWOULDBLOCK说明此时已经没有新连接了
20 //可以退出内层的while循环了
21 if (errno == EWOULDBLOCK)
22 break;
23 //被信号中断重新调用一次accept即可
24 else if (errno == EINTR)
25 continue;
26 else
27 {
28 //其他情况认为出错
29 //做一次错误处理逻辑
30 }
31 } else {
32 //正常接受连接
33 //对clientfd作进一步处理
34 }//end inner-if
35 }//end inner-while-loop
36
37 }//end outer-if
38
39 //其他一些操作
40 }//end outer-while-loop
41}
将 listenfd 设置成非阻塞模式还有一个好处时,我们可以自己定义一次 listenfd 读事件时最大接受多少连接数,这个逻辑也很容易实现,只需要将上述代码的内层 while 循环的判断条件从 true 改成特定的次数就可以:
1void* io_thread_func(void* param)
2{
3 //可以在这里做一些初始化工作
4
5 //每次处理的最大连接数目
6 const int MAX_ACCEPTS_PER_CALL = 200;
7 //当前数量
8 int currentAccept;
9
10 while (退出标志)
11 {
12 epoll_event epoll_events[1024];
13 //listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件
14 n = epoll_wait(epollfd, epoll_events, 1024, 1000);
15
16 if (listenfd上有事件)
17 {
18 currentAccept = 0;
19 while (currentAccept <= MAX_ACCEPTS_PER_CALL)
20 {
21 //此时调用accept函数不会阻塞
22 int clientfd = accept(listenfd, ...);
23 if (clientfd == -1)
24 {
25 //错误码是EWOULDBLOCK说明此时已经没有新连接了
26 //可以退出内层的while循环了
27 if (errno == EWOULDBLOCK)
28 break;
29 //被信号中断重新调用一次accept即可
30 else if (errno == EINTR)
31 continue;
32 else
33 {
34 //其他情况认为出错
35 //做一次错误处理逻辑
36 }
37 } else {
38 //累加处理数量
39 ++currentAccept;
40 //正常接受连接
41 //对clientfd作进一步处理
42 }//end inner-if
43 }//end inner-while-loop
44
45 }//end outer-if
46
47 //其他一些操作
48 }//end outer-while-loop
49}
这是一段比较常用的逻辑,我们以 redis-server 的源码中的使用为例:
1//https://github.com/balloonwj/redis-6.0.3/blob/master/src/networking.c
2//networking.c 971行
3void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
4 //MAX_ACCEPTS_PER_CALL在redis中是1000
5 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
6 char cip[NET_IP_STR_LEN];
7 UNUSED(el);
8 UNUSED(mask);
9 UNUSED(privdata);
10
11 //每次最大处理max个连接数目
12 while(max--) {
13 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
14 if (cfd == ANET_ERR) {
15 //未达到每次处理新连接的最大数时已经无新连接待接收,直接while循环
16 if (errno != EWOULDBLOCK)
17 serverLog(LL_WARNING,
18 "Accepting client connection: %s", server.neterr);
19 return;
20 }
21 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
22 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
23 }
24}
推荐阅读:
原创不易,帮忙点赞、在看和转发呗~