图解:深入理解高性能网络开发路上的绊脚石,同步阻塞网络 IO
The following article is from 开发内功修炼 Author 张彦飞allen
在网络开发模型中,有一种非常易于开发同学使用的方式,那就是同步阻塞的网络 IO(在 Java 中习惯叫 BIO)。
例如我们想请求服务器上的一段数据,那么 C 语言的一段代码 demo 大概是下面这样:
int main()
{
int sk = socket(AF_INET, SOCK_STREAM, 0);
connect(sk, ...)
recv(sk, ...)
}
但是在高并发的服务器开发中,这种网络 IO 的性能奇差。因为
1.进程在 recv 的时候大概率会被阻塞掉,导致一次进程切换
2.当连接上数据就绪的时候进程又会被唤醒,又是一次进程切换
3.一个进程同时只能等待一条连接,如果有很多并发,则需要很多进程
如果用一句话来概括,那就是:同步阻塞网络 IO 是高性能网络开发路上的绊脚石! 俗话说得好,知己知彼方能百战百胜。所以我们今天先不讲优化,只深入分析同步阻塞网络 IO 的内部实现。
在上面的 demo 中虽然只是简单的两三行代码,但实际上用户进程和内核配合做了非常多的工作。先是用户进程发起创建 socket 的指令,然后切换到内核态完成了内核对象的初始化。接下来 Linux 在数据包的接收上,是硬中断和 ksoftirqd 进程在进行处理。当 ksoftirqd 进程处理完以后,再通知到相关的用户进程。
我们今天用图解加源码分析的方式来详细拆解一下上面的每一个步骤,来看一下在内核里是它们是怎么实现的。阅读完本文,你将深刻地理解在同步阻塞的网络 IO 性能低下的原因!
一、创建一个 socket
//file:net/socket.c
SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
......
retval = sock_create(family, type, protocol, &sock);
}
//file:net/socket.c
int __sock_create(struct net *net, int family, int type, int protocol,
struct socket **res, int kern)
{
struct socket *sock;
const struct net_proto_family *pf;
......
//分配 socket 对象
sock = sock_alloc();
//获得每个协议族的操作表
pf = rcu_dereference(net_families[family]);
//调用每个协议族的创建函数, 对于 AF_INET 对应的是
err = pf->create(net, sock, protocol, kern);
}
//file:net/ipv4/af_inet.c
tatic int inet_create(struct net *net, struct socket *sock, int protocol,
int kern)
{
struct sock *sk;
//查找对应的协议,对于TCP SOCK_STREAM 就是获取到了
//static struct inet_protosw inetsw_array[] =
//{
// {
// .type = SOCK_STREAM,
// .protocol = IPPROTO_TCP,
// .prot = &tcp_prot,
// .ops = &inet_stream_ops,
// .no_check = 0,
// .flags = INET_PROTOSW_PERMANENT |
// INET_PROTOSW_ICSK,
// },
//}
list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {
//将 inet_stream_ops 赋到 socket->ops 上
sock->ops = answer->ops;
//获得 tcp_prot
answer_prot = answer->prot;
//分配 sock 对象, 并把 tcp_prot 赋到 sock->sk_prot 上
sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot);
//对 sock 对象进行初始化
sock_init_data(sock, sk);
}
我们再往下看到了 sock_init_data。在这个方法中将 sock 中的 sk_data_ready 函数指针进行了初始化,设置为默认 sock_def_readable()。
//file: net/core/sock.c
void sock_init_data(struct socket *sock, struct sock *sk)
{
sk->sk_data_ready = sock_def_readable;
sk->sk_write_space = sock_def_write_space;
sk->sk_error_report = sock_def_error_report;
}
当软中断上收到数据包时会通过调用 sk_data_ready 函数指针(实际被设置成了 sock_def_readable()) 来唤醒在 sock 上等待的进程。这个咱们后面介绍软中断的时候再说,这里记住这个就行了。
至此,一个 tcp对象,确切地说是 AF_INET 协议族下 SOCK_STREAM对象就算是创建完成了。这里花费了一次 socket 系统调用的开销。
二、等待接收消息
接着我们来看 recv 函数依赖的底层实现。首先通过 strace 命令跟踪,可以看到 clib 库函数 recv 会执行到 recvfrom 系统调用。
进入系统调用后,用户进程就进入到了内核态,通过执行一系列的内核协议层函数,然后到 socket 对象的接收队列中查看是否有数据,没有的话就把自己添加到 socket 对应的等待队列里。最后让出CPU,操作系统会选择下一个就绪状态的进程来执行。整个流程图如下:
//file: net/socket.c
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags, struct sockaddr __user *, addr,
int __user *, addr_len)
{
struct socket *sock;
//根据用户传入的 fd 找到 socket 对象
sock = sockfd_lookup_light(fd, &err, &fput_needed);
......
err = sock_recvmsg(sock, &msg, size, flags);
......
}
static inline int __sock_recvmsg_nosec(struct kiocb *iocb, struct socket *sock,
struct msghdr *msg, size_t size, int flags)
{
......
return sock->ops->recvmsg(iocb, sock, msg, size, flags);
}
调用 socket 对象 ops 里的 recvmsg, 回忆我们上面的 socket 对象图,从图中可以看到 recvmsg 指向的是 inet_recvmsg 方法。
//file: net/ipv4/af_inet.c
int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
size_t size, int flags)
{
...
err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
flags & ~MSG_DONTWAIT, &addr_len);
这里又遇到一个函数指针,这次调用的是 socket 对象里的 sk_prot 下面的 recvmsg方法。同上,得出这个 recvmsg 方法对应的是 tcp_recvmsg 方法。
//file: net/ipv4/tcp.c
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int nonblock, int flags, int *addr_len)
{
int copied = 0;
...
do {
//遍历接收队列接收数据
skb_queue_walk(&sk->sk_receive_queue, skb) {
...
}
...
}
if (copied >= target) {
release_sock(sk);
lock_sock(sk);
} else //没有收到足够数据,启用 sk_wait_data 阻塞当前进程
sk_wait_data(sk, &timeo);
}
终于看到了我们想要看的东西,skb_queue_walk 是在访问 sock 对象下面的接收队列了。
//file: net/core/sock.c
int sk_wait_data(struct sock *sk, long *timeo)
{
//当前进程(current)关联到所定义的等待队列项上
DEFINE_WAIT(wait);
// 调用 sk_sleep 获取 sock 对象下的 wait
// 并准备挂起,将进程状态设置为可打断 INTERRUPTIBLE
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
// 通过调用schedule_timeout让出CPU,然后进行睡眠
rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
...
//file: include/linux/wait.h
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
#define DEFINE_WAIT_FUNC(name, function) \
wait_queue_t name = { \
.private = current, \
.func = function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
}
//file: include/net/sock.h
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
return &rcu_dereference_raw(sk->sk_wq)->wait;
}
//file: kernel/wait.c
void
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
最后再调用 sk_wait_event 让出 CPU,进程将进入睡眠状态,这会导致一次进程上下文的开销。
接下来的小节里我们将能看到进程是如何被唤醒的了。
三、软中断模块
我们看更详细一点的代码:
// file: net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
......
th = tcp_hdr(skb); //获取tcp header
iph = ip_hdr(skb); //获取ip header
//根据数据包 header 中的 ip、端口信息查找到对应的socket
sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
......
//socket 未被用户锁定
if (!sock_owned_by_user(sk)) {
{
if (!tcp_prequeue(sk, skb))
ret = tcp_v4_do_rcv(sk, skb);
}
}
}
//file: net/ipv4/tcp_ipv4.c
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
if (sk->sk_state == TCP_ESTABLISHED) {
//执行连接状态下的数据处理
if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {
rsk = sk;
goto reset;
}
return 0;
}
//其它非 ESTABLISH 状态的数据包处理
......
}
我们假设处理的是 ESTABLISH 状态下的包,这样就又进入 tcp_rcv_established 函数中进行处理。
//file: net/ipv4/tcp_input.c
int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
const struct tcphdr *th, unsigned int len)
{
......
//接收数据到队列中
eaten = tcp_queue_rcv(sk, skb, tcp_header_len,
&fragstolen);
//数据 ready,唤醒 socket 上阻塞掉的进程
sk->sk_data_ready(sk, 0);
在 tcp_rcv_established 中通过调用 tcp_queue_rcv 函数中完成了将接收数据放到 socket 的接收队列上。
//file: net/ipv4/tcp_input.c
static int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,
bool *fragstolen)
{
//把接收到的数据放到 socket 的接收队列的尾部
if (!eaten) {
__skb_queue_tail(&sk->sk_receive_queue, skb);
skb_set_owner_r(skb, sk);
}
return eaten;
}
调用 tcp_queue_rcv 接收完成之后,接着再调用 sk_data_ready 来唤醒在socket上等待的用户进程。 这又是一个函数指针。回想上面我们在 创建 socket 流程里执行到的 sock_init_data 函数,在这个函数里已经把 sk_data_ready 设置成 sock_def_readable 函数了(可以ctrl + f 搜索前文)。它是默认的数据就绪处理函数。
//file: net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
//有进程在此 socket 的等待队列
if (wq_has_sleeper(wq))
//唤醒等待队列上的进程
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
在 sock_def_readable 中再一次访问到了 sock->sk_wq 下的wait。回忆下我们前面调用 recvfrom 执行的最后,通过 DEFINE_WAIT(wait) 将当前进程关联的等待队列添加到 sock->sk_wq 下的 wait 里了。
那接下来就是调用 wake_up_interruptible_sync_poll 来唤醒在 socket 上因为等待数据而被阻塞掉的进程了。
//file: include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
//file: kernel/sched/core.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
int wake_flags = WF_SYNC;
if (unlikely(!q))
return;
if (unlikely(!nr_exclusive))
wake_flags = 0;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
spin_unlock_irqrestore(&q->lock, flags);
}
__wake_up_common 实现唤醒。这里注意下, 该函数调用是参数 nr_exclusive 传入的是 1,这里指的是即使是有多个进程都阻塞在同一个 socket 上,也只唤醒 1 个进程。其作用是为了避免惊群。
//file: kernel/sched/core.c
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
在 __wake_up_common 中找出一个等待队列项 curr,然后调用其 curr->func。回忆我们前面在 recv 函数执行的时候,使用 DEFINE_WAIT() 定义等待队列项的细节,内核把 curr->func 设置成了 autoremove_wake_function。
//file: include/linux/wait.h
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
#define DEFINE_WAIT_FUNC(name, function) \
wait_queue_t name = { \
.private = current, \
.func = function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
}
在 autoremove_wake_function 中,调用了 default_wake_function。
//file: kernel/sched/core.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
void *key)
{
return try_to_wake_up(curr->private, mode, wake_flags);
}
调用 try_to_wake_up 时传入的 task_struct 是 curr->private。这个就是当时因为等待而被阻塞的进程项。当这个函数执行完的时候,在 socket 上等待而被阻塞的进程就被推入到可运行队列里了,这又将是一次进程上下文切换的开销。
小结
好了,我们把上面的流程总结一下。内核在通知网络包的运行环境分两部分:
第一部分是我们自己代码所在的进程,我们调用的 socket() 函数会进入内核态创建必要内核对象。recv() 函数在进入内核态以后负责查看接收队列,以及在没有数据可处理的时候把当前进程阻塞掉,让出 CPU。
第二部分是硬中断、软中断上下文(系统进程 ksoftirqd)。在这些组件中,将包处理完后会放到 socket 的接收队列中。然后再根据 socket 内核对象找到其等待队列中正在因为等待而被阻塞掉的进程,然后把它唤醒。
在服务端角色上,这种模式完全没办法使用。因为这种简单模型里的 socket 和进程是一对一的。我们现在要在单台机器上承载成千上万,甚至十几、上百万的用户连接请求。如果用上面的方式,那就得为每个用户请求都创建一个进程。相信你在无论多原始的服务器网络编程里,都没见过有人这么干吧。
如果让我给它起一个名字的话,它就叫单路不复用(飞哥自创名词)。那么有没有更高效的网络 IO 模型呢?当然有,那就是你所熟知的 select、poll 和 epoll了。下次飞哥再开始拆解 epoll 的实现源码,敬请期待!
这种模式在客户端角色上,现在还存在使用的情形。因为你的进程可能确实得等 Mysql 的数据返回成功之后,才能渲染页面返回给用户,否则啥也干不了。
注意一下,我说的是角色,不是具体的机器。例如对于你的 php/java/golang 接口机,你接收用户请求的时候,你是服务端角色。但当你再请求 redis 的时候,就变为客户端角色了。
不过现在有一些封装的很好的网络框架例如 Sogou Workflow,Golang 的 net 包等在网络客户端角色上也早已摒弃了这种低效的模式!!
- EOF -
看完本文有收获?请分享给更多人
推荐关注「PHP开发者」,提升PHP技能
点赞和在看就是最大的支持❤️