高并发

高并发

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

文章分享panjf2000 发表了文章 • 0 个评论 • 1273 次浏览 • 2019-11-10 22:48 • 来自相关话题

原文Go netpoll I/O 多路复用构建原生网络模型之源码深度解析 ...查看全部

原文

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

导言

Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于 Go 的I/O 多路复用 netpoll),提供了 goroutine-per-connection 这样简单的网络编程模式。在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。

然而,在工程性上能做到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于非常复杂的封装,做了很多取舍,也有可能放弃了一些『极致』的设计和理念。事实上netpoll底层就是基于 epoll/kqueue/iocp 这些系统调用来做封装的,最终暴露出 goroutine-per-connection 这样的极简的开发模式给使用者。

Go netpoll 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

本文将基于 linux 平台来解析 Go netpoll 之 I/O 多路复用的底层是如何基于 epoll 封装实现的,从源码层层推进,全面而深度地解析 Go netpoll 的设计理念和实现原理,以及 Go 是如何利用netpoll来构建它的原生网络模型的。主要涉及到的一些概念:I/O 模式、用户/内核空间、epoll、linux 源码、goroutine scheduler 等等,我会尽量简单地讲解,如果有对相关概念不熟悉的同学,还是希望能提前熟悉一下。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。

I/O 多路复用

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路复用 (I/O multiplexing)
  • 信号驱动 I/O (Signal driven I/O)
  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select & poll

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select 是 epoll 之前 linux 使用的 I/O 事件驱动技术。

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第5位置为1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

epoll 是 linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的等待消息准备好I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 预估错误了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的:返回的活跃连接 == select(全部待监控的连接)

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被高频二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

#include <sys/epoll.h>  
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是不太频繁被调用的,而 epoll_wait 则是非常频繁被调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 都会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为:ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdlist 双向链表中是否有就绪的 fd,当 rdlist 为空(无就绪fd)时挂起当前进程,直到 rdlist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

延伸

另外,我看到有些文章说 epoll 之所以性能高是因为利用了 linux 的 mmap 内存映射让内核和用户进程共享了一片物理内存,用来存放就绪 fd 列表和它们的数据 buffer,所以用户进程在 epoll_wait返回之后用户进程就可以直接从共享内存那里读取/写入数据了,这让我很疑惑,因为首先看epoll_wait的函数声明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二个参数:就绪事件列表,是需要在用户空间分配内存然后再传给epoll_wait的,如果内核会用 mmap 设置共享内存,直接传递一个指针进去就行了,根本不需要在用户态分配内存,多此一举。其次,内核和用户进程通过 mmap 共享内存是一件极度危险的事情,内核无法确定这块共享内存什么时候会被回收,而且这样也会赋予用户进程直接操作内核数据的权限和入口,非常容易出现大的系统漏洞,因此一般极少会这么做。所以我很怀疑 epoll 是不是真的在 linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源码:

/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
// ...

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
// ...

send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/


// 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
// 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
// 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
// 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

// ...
}

// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;

esed.maxevents = maxevents;
esed.events = events;
// 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
// 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}

// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
// ...

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/

revents = ep_item_poll(epi, &pt, 1);
// 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
if (!revents)
continue;
// 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
// 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}

// ...
}

do_epoll_wait开始层层跳转,我们可以很清楚地看到最后内核是通过__put_user函数把就绪 fd 列表和事件返回到用户空间,而__put_user正是内核用来拷贝数据到用户空间的标准函数。此外,我并没有在 linux kernel 的源码中和 epoll 相关的代码里找到 mmap 系统调用做内存映射的逻辑,所以基本可以得出结论:epoll 在 linux kernel 里并没有使用 mmap 来做用户空间和内核空间的内存共享,所以那些说 epoll 使用了 mmap 的文章都是误解。

Non-blocking I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK标志位,从而把 socket 设置成 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

Go netpoll

一个典型的 Go TCP server:

package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}

// start a new goroutine to handle the new connection
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// 如果没有可读数据,也就是读 buffer 为空,则阻塞
_, _ = conn.Read(packet)
// 同理,不可写则阻塞
_, _ = conn.Write(packet)
}
}

上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!

Go netpoll 核心

Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。

上面的示例代码中相关的在源码里的几个数据结构和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

// Conn
type conn struct {
fd *netFD
}

type conn struct {
fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

netFD

net.Listen("tcp", ":8888") 方法返回了一个 TCPListener,它是一个实现了 net.Listener 接口的 struct,而通过 listen.Accept() 接收的新连接 TCPConn 则是一个实现了 net.Conn 接口的 struct,它内嵌了 net.conn struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD 的数据结构的操作,netFD 是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。

netFDpoll.FD的源码:

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

pollDesc

前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:

type pollDesc struct {
runtimeCtx uintptr
}

这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在runtime.pollDesc这里:

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock

// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}

runtime.pollDesc包含自身类型的一个指针,用来保存下一个runtime.pollDesc的地址,以此来实现链表,可以减少数据结构的大小,所有的runtime.pollDesc保存在runtime.pollCache结构中,定义如下:

type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}

net.Listen

调用 net.Listen之后,底层会通过 Linux 的系统调用socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD,接着调用 netFD 的listenStream方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:

// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc func(int, int, int) (int, error) = syscall.Socket

// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...

// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}

// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}

// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once

// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)

// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}

// netpollopen 会被 runtime_pollOpen,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:

#include <sys/epoll.h>  
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服务端的 netFD 在listen时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列
  2. netFD 在accept时将返回的 connFD 也加入 epoll 的事件队列
  3. netFD 在读写时出现syscall.EAGAIN错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead中返回

Listener.Accept()接收来自客户端的新连接,具体还是调用netFD.accept方法来完成这个功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

netFD.accept方法里再调用poll.FD.Accept,最后会使用 linux 的系统调用accept来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。

Conn.Read/Conn.Write

我们先来看看Conn.Read方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链还是首先调用 conn 的netFD.Read,然后内部再调用 poll.FD.Read,最后使用 linux 的系统调用 read: syscall.Read完成数据读取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}

conn.Writeconn.Read的原理是一致的,它也是通过类似 pollDesc.waitReadpollDesc.waitWrite来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite的内部实现原理和pollDesc.waitRead是一样的,都是基于runtime_pollWait,这里就不再赘述。

pollDesc.waitRead

pollDesc.waitRead内部调用了 runtime_pollWait来达成无 I/O 事件时 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
// 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}

// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg

// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,然后重新调度运行该 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}

netpoll

前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由_Grunning_Gwaitting),因此G必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。

所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait,在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(block bool) gList 方法,它会内部调用epoll_wait获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
// 是否以阻塞模式调用 epoll_wait
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
// 判断发生的事件类型,读类型或者写类型
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}

// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}

// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintpt

[ gev ] Go 语言优雅处理 TCP “粘包”

文章分享惜朝 发表了文章 • 0 个评论 • 1435 次浏览 • 2019-11-01 10:48 • 来自相关话题

https://github.com/Allenxuxu/gev ...查看全部

https://github.com/Allenxuxu/gev

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

TCP 为什么会粘包


TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送(Nagle算法)。

gev 如何优雅处理


gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区(ringbuffer)通过参数传递过来。

用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。



做法如下,定义一个接口 Protocol

```go
// Protocol 自定义协议编解码接口
type Protocol interface {
UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
Packet(c *Connection, data []byte) []byte
}
```


用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

下面,我们实现一个简单的自定义协议插件,来启动一个 Server :

```text
| 数据长度 n | payload |
| 4字节 | n 字节 |
```

```go
// protocol.go
package main

import (
"encoding/binary"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
if buffer.VirtualLength() > exampleHeaderLen {
buf := pbytes.GetLen(exampleHeaderLen)
defer pbytes.Put(buf)
_, _ = buffer.VirtualRead(buf)
dataLen := binary.BigEndian.Uint32(buf)

if buffer.VirtualLength() >= int(dataLen) {
ret := make([]byte, dataLen)
_, _ = buffer.VirtualRead(ret)

buffer.VirtualFlush()
return nil, ret
} else {
buffer.VirtualRevert()
}
}
return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
dataLen := len(data)
ret := make([]byte, exampleHeaderLen+dataLen)
binary.BigEndian.PutUint32(ret, uint32(dataLen))
copy(ret[4:], data)
return ret
}
```

```go
// server.go
package main

import (
"flag"
"log"
"strconv"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Println("OnMessage:", data)
out = data
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)
var port int
var loops int

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

s, err := gev.NewServer(handler,
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.Protocol(&ExampleProtocol{}))
if err != nil {
panic(err)
}

log.Println("server start")
s.Start()
}
```
完整代码地址

当回调 `OnMessage` 函数的时候,会通过参数传递已经拆好包的用户数据。

当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 `gev.NewServer` 时指定即可:

```go
gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
```

## 基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件(WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

```go
package websocket

import (
"log"

"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
upgrade *ws.Upgrader
}

// New 创建 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 协议,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
upgraded := c.Context()
if upgraded == nil {
var err error
out, _, err = p.upgrade.Upgrade(buffer)
if err != nil {
log.Println("Websocket Upgrade :", err)
return
}
c.SetContext(true)
} else {
header, err := ws.VirtualReadHeader(buffer)
if err != nil {
log.Println(err)
return
}
if buffer.VirtualLength() >= int(header.Length) {
buffer.VirtualFlush()

payload := make([]byte, int(header.Length))
_, _ = buffer.Read(payload)

if header.Masked {
ws.Cipher(payload, header.Mask, 0)
}

ctx = &header
out = payload
} else {
buffer.VirtualRevert()
}
}
return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
return data
}
```

具体的实现,可以到仓库的 [plugins/websocket](https://github.com/Allenxuxu/gev/tree/master/plugins/websocket) 查看。

## 相关文章

- [开源 gev: Go 实现基于 Reactor 模式的非阻塞 TCP 网络库](https://note.mogutou.xyz/articles/2019/09/19/1568896693634.html)
- [Go 网络库并发吞吐量测试](https://note.mogutou.xyz/articles/2019/09/22/1569146969662.html)

## 项目地址

https://github.com/Allenxuxu/gev

[开源] gev (支持 websocket 啦): Go 实现基于 Reactor 模式的非阻塞网络库

开源程序惜朝 发表了文章 • 0 个评论 • 392 次浏览 • 2019-10-24 11:04 • 来自相关话题

https://github.com/Allenxuxu/gev[gev](https://github.com/Allenxuxu/gev) ...查看全部

https://github.com/Allenxuxu/gev

[gev](https://github.com/Allenxuxu/gev) 是一个轻量、快速、高性能的基于 Reactor 模式的非阻塞网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue。

现在它支持 WebSocket 啦!

支持定时任务,延时任务!

⬇️⬇️⬇️

## 特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持
- 支持 WebSocket
- 支持定时任务,延时任务

## 性能测试

> 测试环境 Ubuntu18.04
- gev
- gnet
- eviop
- evio
- net (标准库)

### 吞吐量测试




仓库地址: https://github.com/Allenxuxu/gev

[gev] 一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

Go开源项目惜朝 发表了文章 • 0 个评论 • 395 次浏览 • 2019-09-25 16:45 • 来自相关话题

`gev` 是一个 ...查看全部

`gev` 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。

➡️➡️ https://github.com/Allenxuxu/gev

特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持

网络模型

`gev` 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

性能测试

> 测试环境 Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

吞吐量测试

限制 GOMAXPROCS=1(单线程),1 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


其他测试

速度测试


和同类库的简单性能比较, 压测方式与 evio 项目相同。
- gnet
- eviop
- evio
- net (标准库)

限制 GOMAXPROCS=1,1 个 work 协程


限制 GOMAXPROCS=1,4 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


安装 gev


```bash
go get -u github.com/Allenxuxu/gev
```

快速入门


```go
package main

import (
"log"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
log.Println("OnMessage")
first, end := buffer.PeekAll()
out = first
if len(end) > 0 {
out = append(out, end...)
}
buffer.RetrieveAll()
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)

s, err := gev.NewServer(handler,
gev.Address(":1833"),
gev.NumLoops(2),
gev.ReusePort(true))
if err != nil {
panic(err)
}

s.Start()
}
```

Handler 是一个接口,我们的程序必须实现它。

```go
type Handler interface {
OnConnect(c *connection.Connection)
OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) []byte
OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error) {
```

在消息到来时,gev 会回调 OnMessage ,在这个函数中可以通过返回一个切片来发送数据给客户端。

```go
func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte)
```

Connection 还提供 Send 方法来发送数据。Send 并不会立刻发送数据,而是先添加到 event loop 的任务队列中,然后唤醒 event loop 去发送。

更详细的使用方式可以参考示例:[服务端定时推送]

```go
func (c *Connection) Send(buffer []byte) error
```

Connection ShutdownWrite 会关闭写端,从而断开连接。

更详细的使用方式可以参考示例:[限制最大连接数]

```go
func (c *Connection) ShutdownWrite() error
```


➡️➡️ https://github.com/Allenxuxu/gev



gnet: 一个轻量级且高性能的 Go 网络库

开源程序panjf2000 发表了文章 • 1 个评论 • 632 次浏览 • 2019-09-18 16:08 • 来自相关话题


gnet












# 博客原文
https://taohuawu.club/go-event-loop-networking-library-gnet

# Github 主页
https://github.com/panjf2000/gnet

欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦。

# 简介

`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll) 和 [kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[libuv](https://github.com/libuv/libuv) 和 [libevent](https://github.com/libevent/libevent)。

这个项目存在的价值是提供一个在网络包处理方面能和 [Redis](http://redis.io)、[Haproxy](http://www.haproxy.org) 这两个项目具有相近性能的Go 语言网络服务器框架。

`gnet` 的亮点在于它是一个高性能、轻量级、非阻塞的纯 Go 实现的传输层(TCP/UDP/Unix-Socket)网络库,开发者可以使用 `gnet` 来实现自己的应用层网络协议,从而构建出自己的应用层网络应用:比如在 `gnet` 上实现 HTTP 协议就可以创建出一个 HTTP 服务器 或者 Web 开发框架,实现 Redis 协议就可以创建出自己的 Redis 服务器等等。

**`gnet` 衍生自另一个项目:`evio`,但是性能更好。**

# 功能

- [高性能](#性能测试) 的基于多线程模型的 Event-Loop 事件驱动
- 内置 Round-Robin 轮询负载均衡算法
- 简洁的 APIs
- 基于 Ring-Buffer 的高效内存利用
- 支持多种网络协议:TCP、UDP、Unix Sockets
- 支持两种事件驱动机制:Linux 里的 epoll 以及 FreeBSD 里的 kqueue
- 支持异步写操作
- 允许多个网络监听地址绑定在一个 Event-Loop 上
- 灵活的事件定时器
- SO_REUSEPORT 端口重用

# 核心设计

## 多线程模型

`gnet` 重新设计开发了一个新内置的多线程模型:『主从 Reactor 多线程』,这也是 `netty` 默认的线程模型,下面是这个模型的原理图:


multi_reactor



它的运行流程如下面的时序图:


reactor



现在我正在 `gnet` 里开发一个新的多线程模型:『带线程/go程池的主从 Reactors 多线程』,并且很快就能完成,这个模型的架构图如下所示:


multi_reactor_thread_pool



它的运行流程如下面的时序图:


multi-reactors



## 通信机制

`gnet` 的『主从 Reactors 多线程』模型是基于 Golang 里的 Goroutines的,一个 Reactor 挂载在一个 Goroutine 上,所以在 `gnet` 的这个网络模型里主 Reactor/Goroutine 与从 Reactors/Goroutines 有海量通信的需求,因此 `gnet` 里必须要有一个能在 Goroutines 之间进行高效率的通信的机制,我没有选择 Golang 里的主流方案:基于 Channel 的 CSP 模型,而是选择了性能更好、基于 Ring-Buffer 的 Disruptor 方案。

所以我最终选择了 [go-disruptor](https://github.com/smartystreets-prototypes/go-disruptor):高性能消息分发队列 LMAX Disruptor 的 Golang 实现。

## 自动扩容的 Ring-Buffer

`gnet` 利用 Ring-Buffer 来缓存 TCP 流数据以及管理内存使用。







# 开始使用

## 安装

```sh
$ go get -u github.com/panjf2000/gnet
```

## 使用示例

```go
// ======================== Echo Server implemented with gnet ===========================

package main

import (
"flag"
"fmt"
"log"
"strings"

"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/ringbuffer"
)

func main() {
var port int
var loops int
var udp bool
var trace bool
var reuseport bool

flag.IntVar(&port, "port", 5000, "server port")
flag.BoolVar(&udp, "udp", false, "listen on udp")
flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)")
flag.BoolVar(&trace, "trace", false, "print packets to console")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()

var events gnet.Events
events.NumLoops = loops
events.OnInitComplete = func(srv gnet.Server) (action gnet.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
return
}
events.React = func(c gnet.Conn, inBuf *ringbuffer.RingBuffer) (out []byte, action gnet.Action) {
top, tail := inBuf.PreReadAll()
out = append(top, tail...)
inBuf.Reset()

if trace {
log.Printf("%s", strings.TrimSpace(string(top)+string(tail)))
}
return
}
scheme := "tcp"
if udp {
scheme = "udp"
}
log.Fatal(gnet.Serve(events, fmt.Sprintf("%s://:%d", scheme, port)))
}

```

## I/O 事件

`gnet` 目前支持的 I/O 事件如下:

- `OnInitComplete` 当 server 初始化完成之后调用。
- `OnOpened` 当连接被打开的时候调用。
- `OnClosed` 当连接被关闭的时候调用。
- `OnDetached` 当主动摘除连接的时候的调用。
- `React` 当 server 端接收到从 client 端发送来的数据的时候调用。(你的核心业务代码一般是写在这个方法里)
- `Tick` 服务器启动的时候会调用一次,之后就以给定的时间间隔定时调用一次,是一个定时器方法。
- `PreWrite` 预先写数据方法,在 server 端写数据回 client 端之前调用。

# 性能测试

## Linux (epoll)

### 系统参数

```powershell
Go Version: go1.12.9 linux/amd64
OS: Ubuntu 18.04
CPU: 8 Virtual CPUs
Memory: 16.0 GiB
```

### Echo Server

![echolinux.png](https://img.hacpai.com/file/2019/09/echolinux-fca6e6e5.png)


### HTTP Server

![httplinux.png](https://img.hacpai.com/file/2019/09/httplinux-663a0318.png)


## FreeBSD (kqueue)

### 系统参数

```powershell
Go Version: go version go1.12.9 darwin/amd64
OS: macOS Mojave 10.14.6
CPU: 4 CPUs
Memory: 8.0 GiB
```

### Echo Server

![echomac.png](https://img.hacpai.com/file/2019/09/echomac-7a29e0d1.png)


### HTTP Server

![httpmac.png](https://img.hacpai.com/file/2019/09/httpmac-cb6d26ea.png)


# 证书

`gnet` 的源码允许用户在遵循 MIT [开源证书](https://github.com/panjf2000/gnet/blob/master/LICENSE) 规则的前提下使用。

# 待做事项

> gnet 还在持续开发的过程中,所以这个仓库的代码和文档会一直持续更新,如果你对 gnet 感兴趣的话,欢迎给这个开源库贡献你的代码~~

使用Go语言每分钟处理1百万请求

技术讨论astaxie 发表了文章 • 5 个评论 • 5635 次浏览 • 2016-10-10 16:25 • 来自相关话题

# 使用Go语言每分钟处理1百万请求(译) --- 在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息 ...查看全部
# 使用Go语言每分钟处理1百万请求(译)

---

在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息安全公司和所有需要的设施来支持一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。

有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的。不要错怪我。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境。但是一段时间后,你会开始以ruby的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。

作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。

# 问题

当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中。接下来,map-reduce系统可以操作这些数据。

按照习惯,我们会调研服务层级架构,涉及的软件如下:

- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- and so on…

搭建了2个不同的集群,一个提供web前端,另外一个提供后端处理,这样我们可以横向扩展后端服务的数量。

但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统。我已经使用了Go语言大约2年时间,我们开发了几个系统,但是很少会达到这样的负载(amount of load)。

我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数。

```
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
```

# 本地Go routines方法

刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}
```

对于中小负载,这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上。我们完全低估了流量。

上面的方案在很多地方很不好。没有办法控制我们产生的go routine的数量。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了。

# 再次尝试

我们需要找一个不同的方式。自开始我们就讨论过, 我们需要保持请求处理程序的生命周期很短,并且进程在后台产生。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题)。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等。这个列表会继续保留,因为有很多的方案可以实现这些。

所以,第二次迭代,我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了。

```
var Queue chan Payload

func init() {
Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
```

接下来,我们再从队列中取job,然后处理它们。我们使用类似于下面的代码:

```
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
```

说实话,我不知道我们在想什么。这肯定是一个满是Red-Bulls的夜晚。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力。

我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长。

![此处输入图片的描述][2]

# 最好的解决方案

我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。

想法是,以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误。这样我们选择了创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
```

我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取。

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}
```

在web server初始化时,我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。

```
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
```

下面是dispatcher的实现代码:

```
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
```

注意到,我们提供了初始化并加入到池子的worker的最大数量。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境,所以我们常常会遵守[12-factor][3]方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值。这种方式,我们控制worker的数量和JobQueue的大小,所以我们可以很快的改变这些值,而不需要重新部署集群。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
```

# 直接结果

我们部署了之后,立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大。

![此处输入图片的描述][4]

Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次。

我们一旦部署了新的代码,服务器的数量从100台大幅 下降到大约20台。

![此处输入图片的描述][5]

我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率,我们就会产生新的实例。

![此处输入图片的描述][6]


# 总结

在我的书中,简单总是获胜。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发。

我们仅仅用了4台机器,这并不是什么新鲜事了。可能它们还不如我的MacBook能力强大,但是却处理了每分钟1百万的写入到S3的请求。

处理问题有正确的工具。当你的 Ruby on Rails 系统需要更强大的web handler时,可以考虑下ruby生态系统之外的技术,或许可以得到更简单但更强大的替代方案。

[文章原文][7]


[1]: http://www.malwarebytes.org/
[2]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-1.png
[3]: http://12factor.net/
[4]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-2.png
[5]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-3.png
[6]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-4.png
[7]: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

文章分享panjf2000 发表了文章 • 0 个评论 • 1273 次浏览 • 2019-11-10 22:48 • 来自相关话题

原文Go netpoll I/O 多路复用构建原生网络模型之源码深度解析 ...查看全部

原文

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

导言

Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于 Go 的I/O 多路复用 netpoll),提供了 goroutine-per-connection 这样简单的网络编程模式。在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。

然而,在工程性上能做到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于非常复杂的封装,做了很多取舍,也有可能放弃了一些『极致』的设计和理念。事实上netpoll底层就是基于 epoll/kqueue/iocp 这些系统调用来做封装的,最终暴露出 goroutine-per-connection 这样的极简的开发模式给使用者。

Go netpoll 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

本文将基于 linux 平台来解析 Go netpoll 之 I/O 多路复用的底层是如何基于 epoll 封装实现的,从源码层层推进,全面而深度地解析 Go netpoll 的设计理念和实现原理,以及 Go 是如何利用netpoll来构建它的原生网络模型的。主要涉及到的一些概念:I/O 模式、用户/内核空间、epoll、linux 源码、goroutine scheduler 等等,我会尽量简单地讲解,如果有对相关概念不熟悉的同学,还是希望能提前熟悉一下。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。

I/O 多路复用

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路复用 (I/O multiplexing)
  • 信号驱动 I/O (Signal driven I/O)
  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select & poll

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select 是 epoll 之前 linux 使用的 I/O 事件驱动技术。

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第5位置为1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

epoll 是 linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的等待消息准备好I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 预估错误了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的:返回的活跃连接 == select(全部待监控的连接)

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被高频二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

#include <sys/epoll.h>  
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是不太频繁被调用的,而 epoll_wait 则是非常频繁被调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 都会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为:ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdlist 双向链表中是否有就绪的 fd,当 rdlist 为空(无就绪fd)时挂起当前进程,直到 rdlist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

延伸

另外,我看到有些文章说 epoll 之所以性能高是因为利用了 linux 的 mmap 内存映射让内核和用户进程共享了一片物理内存,用来存放就绪 fd 列表和它们的数据 buffer,所以用户进程在 epoll_wait返回之后用户进程就可以直接从共享内存那里读取/写入数据了,这让我很疑惑,因为首先看epoll_wait的函数声明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二个参数:就绪事件列表,是需要在用户空间分配内存然后再传给epoll_wait的,如果内核会用 mmap 设置共享内存,直接传递一个指针进去就行了,根本不需要在用户态分配内存,多此一举。其次,内核和用户进程通过 mmap 共享内存是一件极度危险的事情,内核无法确定这块共享内存什么时候会被回收,而且这样也会赋予用户进程直接操作内核数据的权限和入口,非常容易出现大的系统漏洞,因此一般极少会这么做。所以我很怀疑 epoll 是不是真的在 linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源码:

/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
// ...

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
// ...

send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/


// 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
// 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
// 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
// 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

// ...
}

// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;

esed.maxevents = maxevents;
esed.events = events;
// 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
// 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}

// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
// ...

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/

revents = ep_item_poll(epi, &pt, 1);
// 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
if (!revents)
continue;
// 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
// 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}

// ...
}

do_epoll_wait开始层层跳转,我们可以很清楚地看到最后内核是通过__put_user函数把就绪 fd 列表和事件返回到用户空间,而__put_user正是内核用来拷贝数据到用户空间的标准函数。此外,我并没有在 linux kernel 的源码中和 epoll 相关的代码里找到 mmap 系统调用做内存映射的逻辑,所以基本可以得出结论:epoll 在 linux kernel 里并没有使用 mmap 来做用户空间和内核空间的内存共享,所以那些说 epoll 使用了 mmap 的文章都是误解。

Non-blocking I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK标志位,从而把 socket 设置成 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

Go netpoll

一个典型的 Go TCP server:

package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}

// start a new goroutine to handle the new connection
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// 如果没有可读数据,也就是读 buffer 为空,则阻塞
_, _ = conn.Read(packet)
// 同理,不可写则阻塞
_, _ = conn.Write(packet)
}
}

上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!

Go netpoll 核心

Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。

上面的示例代码中相关的在源码里的几个数据结构和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

// Conn
type conn struct {
fd *netFD
}

type conn struct {
fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

netFD

net.Listen("tcp", ":8888") 方法返回了一个 TCPListener,它是一个实现了 net.Listener 接口的 struct,而通过 listen.Accept() 接收的新连接 TCPConn 则是一个实现了 net.Conn 接口的 struct,它内嵌了 net.conn struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD 的数据结构的操作,netFD 是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。

netFDpoll.FD的源码:

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

pollDesc

前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:

type pollDesc struct {
runtimeCtx uintptr
}

这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在runtime.pollDesc这里:

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock

// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}

runtime.pollDesc包含自身类型的一个指针,用来保存下一个runtime.pollDesc的地址,以此来实现链表,可以减少数据结构的大小,所有的runtime.pollDesc保存在runtime.pollCache结构中,定义如下:

type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}

net.Listen

调用 net.Listen之后,底层会通过 Linux 的系统调用socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD,接着调用 netFD 的listenStream方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:

// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc func(int, int, int) (int, error) = syscall.Socket

// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...

// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}

// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}

// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once

// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)

// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}

// netpollopen 会被 runtime_pollOpen,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:

#include <sys/epoll.h>  
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服务端的 netFD 在listen时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列
  2. netFD 在accept时将返回的 connFD 也加入 epoll 的事件队列
  3. netFD 在读写时出现syscall.EAGAIN错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead中返回

Listener.Accept()接收来自客户端的新连接,具体还是调用netFD.accept方法来完成这个功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

netFD.accept方法里再调用poll.FD.Accept,最后会使用 linux 的系统调用accept来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。

Conn.Read/Conn.Write

我们先来看看Conn.Read方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链还是首先调用 conn 的netFD.Read,然后内部再调用 poll.FD.Read,最后使用 linux 的系统调用 read: syscall.Read完成数据读取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}

conn.Writeconn.Read的原理是一致的,它也是通过类似 pollDesc.waitReadpollDesc.waitWrite来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite的内部实现原理和pollDesc.waitRead是一样的,都是基于runtime_pollWait,这里就不再赘述。

pollDesc.waitRead

pollDesc.waitRead内部调用了 runtime_pollWait来达成无 I/O 事件时 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
// 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}

// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg

// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,然后重新调度运行该 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}

netpoll

前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由_Grunning_Gwaitting),因此G必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。

所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait,在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(block bool) gList 方法,它会内部调用epoll_wait获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
// 是否以阻塞模式调用 epoll_wait
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
// 判断发生的事件类型,读类型或者写类型
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}

// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}

// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintpt

[ gev ] Go 语言优雅处理 TCP “粘包”

文章分享惜朝 发表了文章 • 0 个评论 • 1435 次浏览 • 2019-11-01 10:48 • 来自相关话题

https://github.com/Allenxuxu/gev ...查看全部

https://github.com/Allenxuxu/gev

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

TCP 为什么会粘包


TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送(Nagle算法)。

gev 如何优雅处理


gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区(ringbuffer)通过参数传递过来。

用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。



做法如下,定义一个接口 Protocol

```go
// Protocol 自定义协议编解码接口
type Protocol interface {
UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
Packet(c *Connection, data []byte) []byte
}
```


用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

下面,我们实现一个简单的自定义协议插件,来启动一个 Server :

```text
| 数据长度 n | payload |
| 4字节 | n 字节 |
```

```go
// protocol.go
package main

import (
"encoding/binary"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
if buffer.VirtualLength() > exampleHeaderLen {
buf := pbytes.GetLen(exampleHeaderLen)
defer pbytes.Put(buf)
_, _ = buffer.VirtualRead(buf)
dataLen := binary.BigEndian.Uint32(buf)

if buffer.VirtualLength() >= int(dataLen) {
ret := make([]byte, dataLen)
_, _ = buffer.VirtualRead(ret)

buffer.VirtualFlush()
return nil, ret
} else {
buffer.VirtualRevert()
}
}
return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
dataLen := len(data)
ret := make([]byte, exampleHeaderLen+dataLen)
binary.BigEndian.PutUint32(ret, uint32(dataLen))
copy(ret[4:], data)
return ret
}
```

```go
// server.go
package main

import (
"flag"
"log"
"strconv"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Println("OnMessage:", data)
out = data
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)
var port int
var loops int

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

s, err := gev.NewServer(handler,
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.Protocol(&ExampleProtocol{}))
if err != nil {
panic(err)
}

log.Println("server start")
s.Start()
}
```
完整代码地址

当回调 `OnMessage` 函数的时候,会通过参数传递已经拆好包的用户数据。

当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 `gev.NewServer` 时指定即可:

```go
gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
```

## 基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件(WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

```go
package websocket

import (
"log"

"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
upgrade *ws.Upgrader
}

// New 创建 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 协议,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
upgraded := c.Context()
if upgraded == nil {
var err error
out, _, err = p.upgrade.Upgrade(buffer)
if err != nil {
log.Println("Websocket Upgrade :", err)
return
}
c.SetContext(true)
} else {
header, err := ws.VirtualReadHeader(buffer)
if err != nil {
log.Println(err)
return
}
if buffer.VirtualLength() >= int(header.Length) {
buffer.VirtualFlush()

payload := make([]byte, int(header.Length))
_, _ = buffer.Read(payload)

if header.Masked {
ws.Cipher(payload, header.Mask, 0)
}

ctx = &header
out = payload
} else {
buffer.VirtualRevert()
}
}
return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
return data
}
```

具体的实现,可以到仓库的 [plugins/websocket](https://github.com/Allenxuxu/gev/tree/master/plugins/websocket) 查看。

## 相关文章

- [开源 gev: Go 实现基于 Reactor 模式的非阻塞 TCP 网络库](https://note.mogutou.xyz/articles/2019/09/19/1568896693634.html)
- [Go 网络库并发吞吐量测试](https://note.mogutou.xyz/articles/2019/09/22/1569146969662.html)

## 项目地址

https://github.com/Allenxuxu/gev

[开源] gev (支持 websocket 啦): Go 实现基于 Reactor 模式的非阻塞网络库

开源程序惜朝 发表了文章 • 0 个评论 • 392 次浏览 • 2019-10-24 11:04 • 来自相关话题

https://github.com/Allenxuxu/gev[gev](https://github.com/Allenxuxu/gev) ...查看全部

https://github.com/Allenxuxu/gev

[gev](https://github.com/Allenxuxu/gev) 是一个轻量、快速、高性能的基于 Reactor 模式的非阻塞网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue。

现在它支持 WebSocket 啦!

支持定时任务,延时任务!

⬇️⬇️⬇️

## 特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持
- 支持 WebSocket
- 支持定时任务,延时任务

## 性能测试

> 测试环境 Ubuntu18.04
- gev
- gnet
- eviop
- evio
- net (标准库)

### 吞吐量测试




仓库地址: https://github.com/Allenxuxu/gev

[gev] 一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

Go开源项目惜朝 发表了文章 • 0 个评论 • 395 次浏览 • 2019-09-25 16:45 • 来自相关话题

`gev` 是一个 ...查看全部

`gev` 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。

➡️➡️ https://github.com/Allenxuxu/gev

特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持

网络模型

`gev` 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

性能测试

> 测试环境 Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

吞吐量测试

限制 GOMAXPROCS=1(单线程),1 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


其他测试

速度测试


和同类库的简单性能比较, 压测方式与 evio 项目相同。
- gnet
- eviop
- evio
- net (标准库)

限制 GOMAXPROCS=1,1 个 work 协程


限制 GOMAXPROCS=1,4 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


安装 gev


```bash
go get -u github.com/Allenxuxu/gev
```

快速入门


```go
package main

import (
"log"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
log.Println("OnMessage")
first, end := buffer.PeekAll()
out = first
if len(end) > 0 {
out = append(out, end...)
}
buffer.RetrieveAll()
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)

s, err := gev.NewServer(handler,
gev.Address(":1833"),
gev.NumLoops(2),
gev.ReusePort(true))
if err != nil {
panic(err)
}

s.Start()
}
```

Handler 是一个接口,我们的程序必须实现它。

```go
type Handler interface {
OnConnect(c *connection.Connection)
OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) []byte
OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error) {
```

在消息到来时,gev 会回调 OnMessage ,在这个函数中可以通过返回一个切片来发送数据给客户端。

```go
func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte)
```

Connection 还提供 Send 方法来发送数据。Send 并不会立刻发送数据,而是先添加到 event loop 的任务队列中,然后唤醒 event loop 去发送。

更详细的使用方式可以参考示例:[服务端定时推送]

```go
func (c *Connection) Send(buffer []byte) error
```

Connection ShutdownWrite 会关闭写端,从而断开连接。

更详细的使用方式可以参考示例:[限制最大连接数]

```go
func (c *Connection) ShutdownWrite() error
```


➡️➡️ https://github.com/Allenxuxu/gev



gnet: 一个轻量级且高性能的 Go 网络库

开源程序panjf2000 发表了文章 • 1 个评论 • 632 次浏览 • 2019-09-18 16:08 • 来自相关话题


gnet












# 博客原文
https://taohuawu.club/go-event-loop-networking-library-gnet

# Github 主页
https://github.com/panjf2000/gnet

欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦。

# 简介

`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll) 和 [kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[libuv](https://github.com/libuv/libuv) 和 [libevent](https://github.com/libevent/libevent)。

这个项目存在的价值是提供一个在网络包处理方面能和 [Redis](http://redis.io)、[Haproxy](http://www.haproxy.org) 这两个项目具有相近性能的Go 语言网络服务器框架。

`gnet` 的亮点在于它是一个高性能、轻量级、非阻塞的纯 Go 实现的传输层(TCP/UDP/Unix-Socket)网络库,开发者可以使用 `gnet` 来实现自己的应用层网络协议,从而构建出自己的应用层网络应用:比如在 `gnet` 上实现 HTTP 协议就可以创建出一个 HTTP 服务器 或者 Web 开发框架,实现 Redis 协议就可以创建出自己的 Redis 服务器等等。

**`gnet` 衍生自另一个项目:`evio`,但是性能更好。**

# 功能

- [高性能](#性能测试) 的基于多线程模型的 Event-Loop 事件驱动
- 内置 Round-Robin 轮询负载均衡算法
- 简洁的 APIs
- 基于 Ring-Buffer 的高效内存利用
- 支持多种网络协议:TCP、UDP、Unix Sockets
- 支持两种事件驱动机制:Linux 里的 epoll 以及 FreeBSD 里的 kqueue
- 支持异步写操作
- 允许多个网络监听地址绑定在一个 Event-Loop 上
- 灵活的事件定时器
- SO_REUSEPORT 端口重用

# 核心设计

## 多线程模型

`gnet` 重新设计开发了一个新内置的多线程模型:『主从 Reactor 多线程』,这也是 `netty` 默认的线程模型,下面是这个模型的原理图:


multi_reactor



它的运行流程如下面的时序图:


reactor



现在我正在 `gnet` 里开发一个新的多线程模型:『带线程/go程池的主从 Reactors 多线程』,并且很快就能完成,这个模型的架构图如下所示:


multi_reactor_thread_pool



它的运行流程如下面的时序图:


multi-reactors



## 通信机制

`gnet` 的『主从 Reactors 多线程』模型是基于 Golang 里的 Goroutines的,一个 Reactor 挂载在一个 Goroutine 上,所以在 `gnet` 的这个网络模型里主 Reactor/Goroutine 与从 Reactors/Goroutines 有海量通信的需求,因此 `gnet` 里必须要有一个能在 Goroutines 之间进行高效率的通信的机制,我没有选择 Golang 里的主流方案:基于 Channel 的 CSP 模型,而是选择了性能更好、基于 Ring-Buffer 的 Disruptor 方案。

所以我最终选择了 [go-disruptor](https://github.com/smartystreets-prototypes/go-disruptor):高性能消息分发队列 LMAX Disruptor 的 Golang 实现。

## 自动扩容的 Ring-Buffer

`gnet` 利用 Ring-Buffer 来缓存 TCP 流数据以及管理内存使用。







# 开始使用

## 安装

```sh
$ go get -u github.com/panjf2000/gnet
```

## 使用示例

```go
// ======================== Echo Server implemented with gnet ===========================

package main

import (
"flag"
"fmt"
"log"
"strings"

"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/ringbuffer"
)

func main() {
var port int
var loops int
var udp bool
var trace bool
var reuseport bool

flag.IntVar(&port, "port", 5000, "server port")
flag.BoolVar(&udp, "udp", false, "listen on udp")
flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)")
flag.BoolVar(&trace, "trace", false, "print packets to console")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()

var events gnet.Events
events.NumLoops = loops
events.OnInitComplete = func(srv gnet.Server) (action gnet.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
return
}
events.React = func(c gnet.Conn, inBuf *ringbuffer.RingBuffer) (out []byte, action gnet.Action) {
top, tail := inBuf.PreReadAll()
out = append(top, tail...)
inBuf.Reset()

if trace {
log.Printf("%s", strings.TrimSpace(string(top)+string(tail)))
}
return
}
scheme := "tcp"
if udp {
scheme = "udp"
}
log.Fatal(gnet.Serve(events, fmt.Sprintf("%s://:%d", scheme, port)))
}

```

## I/O 事件

`gnet` 目前支持的 I/O 事件如下:

- `OnInitComplete` 当 server 初始化完成之后调用。
- `OnOpened` 当连接被打开的时候调用。
- `OnClosed` 当连接被关闭的时候调用。
- `OnDetached` 当主动摘除连接的时候的调用。
- `React` 当 server 端接收到从 client 端发送来的数据的时候调用。(你的核心业务代码一般是写在这个方法里)
- `Tick` 服务器启动的时候会调用一次,之后就以给定的时间间隔定时调用一次,是一个定时器方法。
- `PreWrite` 预先写数据方法,在 server 端写数据回 client 端之前调用。

# 性能测试

## Linux (epoll)

### 系统参数

```powershell
Go Version: go1.12.9 linux/amd64
OS: Ubuntu 18.04
CPU: 8 Virtual CPUs
Memory: 16.0 GiB
```

### Echo Server

![echolinux.png](https://img.hacpai.com/file/2019/09/echolinux-fca6e6e5.png)


### HTTP Server

![httplinux.png](https://img.hacpai.com/file/2019/09/httplinux-663a0318.png)


## FreeBSD (kqueue)

### 系统参数

```powershell
Go Version: go version go1.12.9 darwin/amd64
OS: macOS Mojave 10.14.6
CPU: 4 CPUs
Memory: 8.0 GiB
```

### Echo Server

![echomac.png](https://img.hacpai.com/file/2019/09/echomac-7a29e0d1.png)


### HTTP Server

![httpmac.png](https://img.hacpai.com/file/2019/09/httpmac-cb6d26ea.png)


# 证书

`gnet` 的源码允许用户在遵循 MIT [开源证书](https://github.com/panjf2000/gnet/blob/master/LICENSE) 规则的前提下使用。

# 待做事项

> gnet 还在持续开发的过程中,所以这个仓库的代码和文档会一直持续更新,如果你对 gnet 感兴趣的话,欢迎给这个开源库贡献你的代码~~

使用Go语言每分钟处理1百万请求

技术讨论astaxie 发表了文章 • 5 个评论 • 5635 次浏览 • 2016-10-10 16:25 • 来自相关话题

# 使用Go语言每分钟处理1百万请求(译) --- 在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息 ...查看全部
# 使用Go语言每分钟处理1百万请求(译)

---

在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息安全公司和所有需要的设施来支持一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。

有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的。不要错怪我。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境。但是一段时间后,你会开始以ruby的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。

作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。

# 问题

当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中。接下来,map-reduce系统可以操作这些数据。

按照习惯,我们会调研服务层级架构,涉及的软件如下:

- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- and so on…

搭建了2个不同的集群,一个提供web前端,另外一个提供后端处理,这样我们可以横向扩展后端服务的数量。

但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统。我已经使用了Go语言大约2年时间,我们开发了几个系统,但是很少会达到这样的负载(amount of load)。

我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数。

```
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
```

# 本地Go routines方法

刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}
```

对于中小负载,这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上。我们完全低估了流量。

上面的方案在很多地方很不好。没有办法控制我们产生的go routine的数量。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了。

# 再次尝试

我们需要找一个不同的方式。自开始我们就讨论过, 我们需要保持请求处理程序的生命周期很短,并且进程在后台产生。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题)。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等。这个列表会继续保留,因为有很多的方案可以实现这些。

所以,第二次迭代,我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了。

```
var Queue chan Payload

func init() {
Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
```

接下来,我们再从队列中取job,然后处理它们。我们使用类似于下面的代码:

```
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
```

说实话,我不知道我们在想什么。这肯定是一个满是Red-Bulls的夜晚。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力。

我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长。

![此处输入图片的描述][2]

# 最好的解决方案

我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。

想法是,以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误。这样我们选择了创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
```

我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取。

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}
```

在web server初始化时,我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。

```
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
```

下面是dispatcher的实现代码:

```
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
```

注意到,我们提供了初始化并加入到池子的worker的最大数量。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境,所以我们常常会遵守[12-factor][3]方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值。这种方式,我们控制worker的数量和JobQueue的大小,所以我们可以很快的改变这些值,而不需要重新部署集群。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
```

# 直接结果

我们部署了之后,立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大。

![此处输入图片的描述][4]

Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次。

我们一旦部署了新的代码,服务器的数量从100台大幅 下降到大约20台。

![此处输入图片的描述][5]

我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率,我们就会产生新的实例。

![此处输入图片的描述][6]


# 总结

在我的书中,简单总是获胜。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发。

我们仅仅用了4台机器,这并不是什么新鲜事了。可能它们还不如我的MacBook能力强大,但是却处理了每分钟1百万的写入到S3的请求。

处理问题有正确的工具。当你的 Ruby on Rails 系统需要更强大的web handler时,可以考虑下ruby生态系统之外的技术,或许可以得到更简单但更强大的替代方案。

[文章原文][7]


[1]: http://www.malwarebytes.org/
[2]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-1.png
[3]: http://12factor.net/
[4]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-2.png
[5]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-3.png
[6]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-4.png
[7]: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/