查看原文
其他

IO多路复用与Go网络库的实现

Go开发大全 2021-07-20

(给Go开发大全加星标)

来源:nino

https://ninokop.github.io/2018/02/18/go-net/

导读】go原生net包深度解析


去年在读UNP等几本书时都涉及到IO及相关知识,不过由于点很零散,当时没有总结。趁这次看golang net包的机会,结合之前看的比较基础的概念写了这篇博客,记录下现在的理解,方便以后回顾。

IO多路复用

Unix网络编程里总结了5种IO模型,其中只有异步IO模型是异步IO,因为只有异步IO的recvfrom是不阻塞进程的。首先每个IO读操作都包括以下两个过程,书里说的异步就是指发起读操作时,数据从内核拷贝到用户空间是否要阻塞进程这个问题。

  • 等待网络数据到达网卡并读取到内核缓冲区,数据准备好。
  • 从内核缓冲区复制数据到进程空间。

另外阻塞IO模型和阻塞IO调用是不同的。比如非阻塞IO模型当中其实包含了多次非阻塞IO调用和一次阻塞IO调用,非阻塞IO调用是指在内核无数据准备好时,recvfrom不阻塞进程直接返回,在内核有数据时发起的recvfrom其实还是阻塞的。

IO复用是指:进程阻塞于select,等待多个IO中的任一个变为可读,select调用返回,通知相应IO可以读。它可以支持单线程响应多个请求这种模式。

它本质上是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,只需要把buffer提交给内核,内核会负责把数据从内核拷贝到用户空间,然后告诉你已可读。

select poll

select poll epoll这三个是常用的IO复用的系统调用。select和poll本质相同,都对同时监听的fd有数量限制,因为他们涉及大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,因此它的开销随着文件描述符数量的增加而线性增大。

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  2. 同时每次调用select都需要在内核遍历传递进来的所有fd,把当前进程加入fd对应的设备等待队列中
  3. select支持的文件描述符数量太小了,默认是1024

epoll

epoll涉及三个系统调用,epoll用来创建epollfd文件描述符(之后要close),epoll_ctl用来注册每个描述符及其等待的事件,epoll_wait监听epollfd上注册的事件,内核负责把数据复制到这个 events 数组中。

#include <sys/epoll.h>  
int epoll_create(int size);  
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  1. 将fd从用户态到内核态拷贝的过程统一到epoll_ctl,而不是在epoll_wait中每次循环调用都拷贝一次。
  2. 在epoll_ctl时把当前进程挂到fd对应的设备等待队列中,并为每个fd指定一个回调函数。当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd,并唤醒在epoll_wait中进入睡眠的进程。
  3. epoll没有最多fd的这个限制,它所支持的fd上限是最大可以打开文件的数目。

non-blocking IO

brpc-io.md中有一段对两种blocking的对比,觉得说的很好,转载一下。

https://github.com/eesly/brpc/blob/master/docs/cn/io.md#the-full-picture

linux一般使用non-blocking IO提高IO并发度。当IO并发度很低时,non-blocking IO不一定比blocking IO更高效,因为后者完全由内核负责,而read/write这类系统调用已高度优化,效率显然高于一般的多个线程协作的non-blocking IO。

但当IO并发度愈发提高时,blocking IO阻塞一个线程的弊端便显露出来:内核得不停地在线程间切换才能完成有效的工作,一个cpu core上可能只做了一点点事情,就马上又换成了另一个线程,cpu cache没得到充分利用,另外大量的线程会使得依赖thread-local加速的代码性能明显下降,如tcmalloc,一旦malloc变慢,程序整体性能往往也会随之下降。

而non-blocking IO一般由少量event dispatching线程和一些运行用户逻辑的worker线程组成,这些线程往往会被复用(换句话说调度工作转移到了用户态),event dispatching和worker可以同时在不同的核运行(流水线化),内核不用频繁的切换就能完成有效的工作。线程总量也不用很多,所以对thread-local的使用也比较充分。这时候non-blocking IO就往往比blocking IO快了。

不过non-blocking IO也有自己的问题,它需要调用更多系统调用,比如epoll_ctl,由于epoll实现为一棵红黑树,epoll_ctl并不是一个很快的操作,特别在多核环境下,依赖epoll_ctl的实现往往会面临棘手的扩展性问题。non-blocking需要更大的缓冲,否则就会触发更多的事件而影响效率。non-blocking还得解决不少多线程问题,代码比blocking复杂很多。

服务器常用编程模型

以下是深入理解计算机系统里经典的echo server。它的问题是这样同步阻塞的服务端无法处理来自客户端的并发请求。解决的方法有两个:使用多线程,或者IO多路复用。多线程的问题是创建进程或线程需要时间和空间,连接多了之后,切换开销很大,占用内存多,多线程修改共享数据产生的竞争条件需要加锁,容易造成死锁。IO复用的问题是不能充分利用多核CPU,且它通常要求事件的回调函数必须是非阻塞的。

  • 多线程:accept了之后new thread or process来处理这个connfd上的请求。
  • IO多路复用:non-blocking IO+IO multiplexing这种Reactor模式。基本结构是一个event loop,以事件驱动和事件回调的方式实现业务逻辑。

server:socket + bind + listen + accept + ——- + close
client: socket + connect + ——— + close

//bind&listen
while(1) {
  connfd = accept(listenfd, (struct) sockaddr *) &clientaddr, &clientlen);
  // read(connfd, buf, BUFSIZE);
  // write(connfd, buf, strlen(buf));
  close(connfd);
}

reactor

实际上目前的高性能服务器很多都用的是reactor模式,即non-blocking IO+IO multiplexing的方式。通常主线程只做event-loop,通过epoll_wait等方式监听事件,而处理客户请求是在其他工作线程中完成。小伙伴有个测试集群效应的例子thundering_herd_problem就是用epoll来处理连接和请求,可参考。下图来自Golang网络层实现总结的传统网络实现。

  • server端在bind&listen后,将listenfd注册到epollfd中,最后进入epoll_wait循环。循环过程中若有在listenfd上的event则调用socket.accept,并将connfd加入到epollfd的IO复用队列。
  • 当connfd上有数据到来或写缓冲有数据可以触发epoll_wait的返回,这里读写IO都是非阻塞IO,这样才不会阻塞epoll的下一个循环。然而,这样容易割裂业务逻辑,不易理解和维护。
  • read后数据进行解码并放入队列中,等待工作线程处理。

accept连接以及conn上读写若是在主线程完成,则要求是非阻塞IO,因为IO操作不能阻塞epoll_wait循环。实际上event loop可能也可以是多线程的,只是单个线程里只有一个epoll_wait

io

goroutine

Go因为有goroutine,所以可以采用多协程来解决并发问题。accept连接后,将连接丢给goroutine处理后续的读写操作。在开发者看到的这个goroutine中业务逻辑是同步的,也不用考虑IO是否阻塞。

func main() {
    ln, err := net.Listen("tcp"":8080")
    for {
        conn, _ := ln.Accept()
        go echoFunc(conn)
    }
}

可以肯定的是,在linux上Go语言写的网络服务器也是采用的epoll作为最底层的数据收发驱动,Go语言网络的底层实现中同样存在“上下文切换”的工作,只是这个切换工作由runtime的调度器来做了,减少了程序员的负担。

go net core

golang的net如何实现对epoll的封装,在使用上看上去同步编程的呢,这是本节的问题。总结来说,所有的网络操作都以网络描述符netFD为中心实现。netFD与底层PollDesc结构绑定,当在一个netFD上读写遇到EAGAIN错误时,就将当前goroutine存储到这个netFD对应的PollDesc中,同时将goroutine给park住,直到这个netFD上再次发生读写事件,才将此goroutine给ready激活重新运行。显然,在底层通知goroutine再次发生读写等事件的方式就是epoll等事件驱动机制。

netFD

服务端通过listen建立起的Listener是个实现了Accept Close等方法的接口。通过listener的Accept方法返回的Conn是一个实现了Read Write等方法的接口。Listener和Conn的实现都包含一个网络文件描述符netFD,它其中包含一个重要的数据结构pollDesc,它是底层事件驱动的封装。

  1. 服务端的netFD在listen时会创建epoll的实例,并将listenFD加入epoll的事件队列
  2. netFD在accept时将返回的connFD也加入epoll的事件队列
  3. netFD在读写时出现syscall.EAGAIN错误,通过pollDesc将当前的goroutine park住,直到ready,从pollDesc的waitRead中返回
type TCPListener struct {
    fd *netFD
}
type netFD struct {
    // 省略其他成员
    pd pollDesc
}

net.Listen过程中,新建了描述listenFD的数据结构netFD,并在netFD的listenStream方法中实现了建立socket的bind&listen和netFD的初始化。polldesc的init初始化了底层的epoll实例,并将fd添加到了epoll的事件队列中。ln.Accept() 实际上通过netFD的accept,用系统调用accept返回的connFD新建一个新的netFD并初始化,即把它也加入到epoll的事件队列中。

func (fd *netFD) init() error {
    if err := fd.pd.init(fd); err != nil {
        return err
    }
    return nil
}

netFD的Read操作在系统调用Read后,当有syscall.EAGAIN错误发生时,WaitRead将当前读这个connFD的goroutine给park住,直到这个connFD上的读事件再次发生为止,waitRead调用返回,继续for循环的执行。netFD的Write方法和Read的实现原理是一样的,都是在碰到EAGAIN错误的时候将当前goroutine给park住直到socket再次可写为止。

这样的实现,就让调用netFD的Read的地方变成了同步阻塞方式。

func (fd *netFD) Read(p []byte) (n int, err error) {
    for {
        n, err = syscall.Read(fd.sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue
                }
            }
        }
        err = fd.eofError(n, err)
        break
    }
    return
}

polldesc

polldesc的初始化通过runtime层封装的pollServerInit 实现了epoll实例的sync.Once的初始化。初始化时通过netpollinit实现的,在linux上更底层就是epoll_create创建了epollFD。初始化的第二步是要加入epoll的事件队列,通过runtime_pollOpen实现。其中netpollopen在linux上就是epoll_ctl。

func (pd *pollDesc) init(fd *netFD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
    pd.runtimeCtx = ctx
    return nil
}
func net_runtime_pollServerInit() {
    netpollinit()
    atomic.Store(&netpollInited, 1)
}
func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    ...
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

最后在netFD实现阻塞的pd.waitRead是通过netpollblock实现的,即gopark当前goroutine,直到IO ready才从netpollblock中返回。

func (pd *pollDesc) wait(mode int) error {
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res)
}
func net_runtime_pollWait(pd *pollDesc, mode int) int {
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
    }
    return 0
}

如何实现IO ready的通知,让陷入IO wait的goroutine重新被调度呢,这个跟proc.go里的调度有关。在goroutine调度器findrunnable可运行的goroutine时,将不阻塞的执行netpoll,即执行epollwait,监听等待在epollFD上的事件队列里的FD是否就绪,有则wakeUp这个goutine开始运行,也就使这个netpollblock可以返回,使netFD解除阻塞。

netpoll_epoll

上面提到的net_runtime_pollServerInit和net_runtime_pollOpen都是对底层事件驱动机制的封装,封装的意义在于屏蔽不同操作系统的实现细节。在linux上是通过runtime包中的netpoll_epoll.go实现的。它封装了epoll的三个系统调用,即epoll_create epoll_ctl和epoll_wait。

它封装成了四个runtime函数。netpollinit 使用epoll_create创建epollfd,netpollopen 添加一个fd到epoll中,这里的数据结构称为pollDesc,它一开始都关注了读写事件,并且采用的是边缘触发netpollclose函数就是从epoll删除一个fd。

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

EPOLLRDHUP,这个事件是在较新的内核版本添加的,目的是解决对端socket关闭,epoll本身并不能直接感知到这个关闭动作的问题。

netpoll 就是从epoll wait得到所有发生事件的fd,并将每个fd对应的goroutine通过链表返回这个操作是在goroutine调度器中使用的,用来将因为IO wait而阻塞的goroutine重新调度。

func netpoll(block bool) *g {
    if epfd == -1 {
        return nil
    }
    waitms := int32(-1)
    if !block {
        waitms = 0
    }
    var events [128]epollevent
retry:
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        goto retry
    }
    var gp guintptr
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            netpollready(&gp, pd, mode)
        }
    }
    if block && gp == 0 {
        goto retry
    }
    return gp.ptr()



 - EOF -

推荐阅读(点击标题可打开)

1、Go语言错误处理的优雅实现

2、使用Go构建Kubernetes应用

3、Go 命令行工具项目结构最佳实践

Go 开发大全

参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。

关注后获取

回复 Go 获取6万star的Go资源库



分享、点赞和在看

支持我们分享更多好文章,谢谢!

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存