golang

golang

关于规范招聘信息发帖说明

招聘应聘 回复了问题 • 10 人关注 • 4 个回复 • 5661 次浏览 • 2019-10-04 10:59 • 来自相关话题

bench fasthttp 和 库自带的net.http性能,fasthttp完败?

技术讨论tsingson 回复了问题 • 3 人关注 • 4 个回复 • 57 次浏览 • 2 小时前 • 来自相关话题

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

文章分享panjf2000 发表了文章 • 0 个评论 • 311 次浏览 • 4 天前 • 来自相关话题

原文Go netpoll I/O 多路复用构建原生网络模型之源码深度解析 ...查看全部

原文

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

导言

Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于 Go 的I/O 多路复用 netpoll),提供了 goroutine-per-connection 这样简单的网络编程模式。在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。

然而,在工程性上能做到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于非常复杂的封装,做了很多取舍,也有可能放弃了一些『极致』的设计和理念。事实上netpoll底层就是基于 epoll/kqueue/iocp 这些系统调用来做封装的,最终暴露出 goroutine-per-connection 这样的极简的开发模式给使用者。

Go netpoll 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

本文将基于 linux 平台来解析 Go netpoll 之 I/O 多路复用的底层是如何基于 epoll 封装实现的,从源码层层推进,全面而深度地解析 Go netpoll 的设计理念和实现原理,以及 Go 是如何利用netpoll来构建它的原生网络模型的。主要涉及到的一些概念:I/O 模式、用户/内核空间、epoll、linux 源码、goroutine scheduler 等等,我会尽量简单地讲解,如果有对相关概念不熟悉的同学,还是希望能提前熟悉一下。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。

I/O 多路复用

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路复用 (I/O multiplexing)
  • 信号驱动 I/O (Signal driven I/O)
  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select & poll

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select 是 epoll 之前 linux 使用的 I/O 事件驱动技术。

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第5位置为1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

epoll 是 linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的等待消息准备好I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 预估错误了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的:返回的活跃连接 == select(全部待监控的连接)

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被高频二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

#include <sys/epoll.h>  
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是不太频繁被调用的,而 epoll_wait 则是非常频繁被调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 都会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为:ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdlist 双向链表中是否有就绪的 fd,当 rdlist 为空(无就绪fd)时挂起当前进程,直到 rdlist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

延伸

另外,我看到有些文章说 epoll 之所以性能高是因为利用了 linux 的 mmap 内存映射让内核和用户进程共享了一片物理内存,用来存放就绪 fd 列表和它们的数据 buffer,所以用户进程在 epoll_wait返回之后用户进程就可以直接从共享内存那里读取/写入数据了,这让我很疑惑,因为首先看epoll_wait的函数声明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二个参数:就绪事件列表,是需要在用户空间分配内存然后再传给epoll_wait的,如果内核会用 mmap 设置共享内存,直接传递一个指针进去就行了,根本不需要在用户态分配内存,多此一举。其次,内核和用户进程通过 mmap 共享内存是一件极度危险的事情,内核无法确定这块共享内存什么时候会被回收,而且这样也会赋予用户进程直接操作内核数据的权限和入口,非常容易出现大的系统漏洞,因此一般极少会这么做。所以我很怀疑 epoll 是不是真的在 linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源码:

/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
// ...

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
// ...

send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/


// 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
// 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
// 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
// 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

// ...
}

// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;

esed.maxevents = maxevents;
esed.events = events;
// 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
// 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}

// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
// ...

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/

revents = ep_item_poll(epi, &pt, 1);
// 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
if (!revents)
continue;
// 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
// 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}

// ...
}

do_epoll_wait开始层层跳转,我们可以很清楚地看到最后内核是通过__put_user函数把就绪 fd 列表和事件返回到用户空间,而__put_user正是内核用来拷贝数据到用户空间的标准函数。此外,我并没有在 linux kernel 的源码中和 epoll 相关的代码里找到 mmap 系统调用做内存映射的逻辑,所以基本可以得出结论:epoll 在 linux kernel 里并没有使用 mmap 来做用户空间和内核空间的内存共享,所以那些说 epoll 使用了 mmap 的文章都是误解。

Non-blocking I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK标志位,从而把 socket 设置成 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

Go netpoll

一个典型的 Go TCP server:

package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}

// start a new goroutine to handle the new connection
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// 如果没有可读数据,也就是读 buffer 为空,则阻塞
_, _ = conn.Read(packet)
// 同理,不可写则阻塞
_, _ = conn.Write(packet)
}
}

上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!

Go netpoll 核心

Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。

上面的示例代码中相关的在源码里的几个数据结构和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

// Conn
type conn struct {
fd *netFD
}

type conn struct {
fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

netFD

net.Listen("tcp", ":8888") 方法返回了一个 TCPListener,它是一个实现了 net.Listener 接口的 struct,而通过 listen.Accept() 接收的新连接 TCPConn 则是一个实现了 net.Conn 接口的 struct,它内嵌了 net.conn struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD 的数据结构的操作,netFD 是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。

netFDpoll.FD的源码:

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

pollDesc

前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:

type pollDesc struct {
runtimeCtx uintptr
}

这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在runtime.pollDesc这里:

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock

// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}

runtime.pollDesc包含自身类型的一个指针,用来保存下一个runtime.pollDesc的地址,以此来实现链表,可以减少数据结构的大小,所有的runtime.pollDesc保存在runtime.pollCache结构中,定义如下:

type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}

net.Listen

调用 net.Listen之后,底层会通过 Linux 的系统调用socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD,接着调用 netFD 的listenStream方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:

// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc func(int, int, int) (int, error) = syscall.Socket

// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...

// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}

// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}

// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once

// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)

// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}

// netpollopen 会被 runtime_pollOpen,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:

#include <sys/epoll.h>  
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服务端的 netFD 在listen时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列
  2. netFD 在accept时将返回的 connFD 也加入 epoll 的事件队列
  3. netFD 在读写时出现syscall.EAGAIN错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead中返回

Listener.Accept()接收来自客户端的新连接,具体还是调用netFD.accept方法来完成这个功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

netFD.accept方法里再调用poll.FD.Accept,最后会使用 linux 的系统调用accept来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。

Conn.Read/Conn.Write

我们先来看看Conn.Read方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链还是首先调用 conn 的netFD.Read,然后内部再调用 poll.FD.Read,最后使用 linux 的系统调用 read: syscall.Read完成数据读取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}

conn.Writeconn.Read的原理是一致的,它也是通过类似 pollDesc.waitReadpollDesc.waitWrite来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite的内部实现原理和pollDesc.waitRead是一样的,都是基于runtime_pollWait,这里就不再赘述。

pollDesc.waitRead

pollDesc.waitRead内部调用了 runtime_pollWait来达成无 I/O 事件时 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
// 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}

// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg

// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,然后重新调度运行该 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}

netpoll

前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由_Grunning_Gwaitting),因此G必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。

所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait,在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(block bool) gList 方法,它会内部调用epoll_wait获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
// 是否以阻塞模式调用 epoll_wait
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
// 判断发生的事件类型,读类型或者写类型
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}

// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}

// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintpt

driver: bad connection 问题

回复

有问必答treedom 回复了问题 • 1 人关注 • 1 个回复 • 127 次浏览 • 2019-11-08 09:20 • 来自相关话题

gochat - 纯go实现的im即时通讯系统(支持水平扩展)

开源程序gochat 发表了文章 • 0 个评论 • 466 次浏览 • 2019-11-04 15:56 • 来自相关话题

项目推荐- 项目地址:https://github.com/LockG ...查看全部

项目推荐
- 项目地址:
https://github.com/LockGit/gochat

- 类别:Go

- 项目描述:

gochat为纯go实现的即时通讯系统,支持私信消息与房间广播消息,各层之间通过rpc通讯,支持水平扩展。
使用redis作为消息存储与投递的载体,相对kafka操作起来更加方便快捷,所以十分轻量。
各层之间基于etcd服务发现,在扩容部署时将会方便很多。
由于go的交叉编译特性,编译后可以快速在各个平台上运行,gochat架构及目录结构清晰,
并且本项目还贴心的提供了docker一键构建所有环境依赖,安装起来十分便捷。

- 推荐理由:
轻量快捷不臃肿,水平可扩展,docker快速构建所有环境,迅速体验im即时通讯,
各层架构清晰,文档说明详细。

系统架构:



服务发现:


消息投递


聊天室预览:

[ gev ] Go 语言优雅处理 TCP “粘包”

文章分享惜朝 发表了文章 • 0 个评论 • 621 次浏览 • 2019-11-01 10:48 • 来自相关话题

https://github.com/Allenxuxu/gev ...查看全部

https://github.com/Allenxuxu/gev

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

TCP 为什么会粘包


TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送(Nagle算法)。

gev 如何优雅处理


gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区(ringbuffer)通过参数传递过来。

用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。



做法如下,定义一个接口 Protocol

```go
// Protocol 自定义协议编解码接口
type Protocol interface {
UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
Packet(c *Connection, data []byte) []byte
}
```


用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

下面,我们实现一个简单的自定义协议插件,来启动一个 Server :

```text
| 数据长度 n | payload |
| 4字节 | n 字节 |
```

```go
// protocol.go
package main

import (
"encoding/binary"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
if buffer.VirtualLength() > exampleHeaderLen {
buf := pbytes.GetLen(exampleHeaderLen)
defer pbytes.Put(buf)
_, _ = buffer.VirtualRead(buf)
dataLen := binary.BigEndian.Uint32(buf)

if buffer.VirtualLength() >= int(dataLen) {
ret := make([]byte, dataLen)
_, _ = buffer.VirtualRead(ret)

buffer.VirtualFlush()
return nil, ret
} else {
buffer.VirtualRevert()
}
}
return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
dataLen := len(data)
ret := make([]byte, exampleHeaderLen+dataLen)
binary.BigEndian.PutUint32(ret, uint32(dataLen))
copy(ret[4:], data)
return ret
}
```

```go
// server.go
package main

import (
"flag"
"log"
"strconv"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Println("OnMessage:", data)
out = data
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)
var port int
var loops int

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

s, err := gev.NewServer(handler,
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.Protocol(&ExampleProtocol{}))
if err != nil {
panic(err)
}

log.Println("server start")
s.Start()
}
```
完整代码地址

当回调 `OnMessage` 函数的时候,会通过参数传递已经拆好包的用户数据。

当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 `gev.NewServer` 时指定即可:

```go
gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
```

## 基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件(WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

```go
package websocket

import (
"log"

"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
upgrade *ws.Upgrader
}

// New 创建 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 协议,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
upgraded := c.Context()
if upgraded == nil {
var err error
out, _, err = p.upgrade.Upgrade(buffer)
if err != nil {
log.Println("Websocket Upgrade :", err)
return
}
c.SetContext(true)
} else {
header, err := ws.VirtualReadHeader(buffer)
if err != nil {
log.Println(err)
return
}
if buffer.VirtualLength() >= int(header.Length) {
buffer.VirtualFlush()

payload := make([]byte, int(header.Length))
_, _ = buffer.Read(payload)

if header.Masked {
ws.Cipher(payload, header.Mask, 0)
}

ctx = &header
out = payload
} else {
buffer.VirtualRevert()
}
}
return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
return data
}
```

具体的实现,可以到仓库的 [plugins/websocket](https://github.com/Allenxuxu/gev/tree/master/plugins/websocket) 查看。

## 相关文章

- [开源 gev: Go 实现基于 Reactor 模式的非阻塞 TCP 网络库](https://note.mogutou.xyz/articles/2019/09/19/1568896693634.html)
- [Go 网络库并发吞吐量测试](https://note.mogutou.xyz/articles/2019/09/22/1569146969662.html)

## 项目地址

https://github.com/Allenxuxu/gev

yiigo 4.x 版本发布

开源程序IIInsomnia 发表了文章 • 0 个评论 • 340 次浏览 • 2019-10-25 11:50 • 来自相关话题

# yiigo[![golang](https://img.shields.io/badge/Language-Go-green.svg?style=flat)](https://golang.org)[![GitHub rele ...查看全部

# yiigo

[![golang](https://img.shields.io/badge/Language-Go-green.svg?style=flat)](https://golang.org)
[![GitHub release](https://img.shields.io/github/release/IIInsomnia/yiigo.svg)](https://github.com/iiinsomnia/yiigo/releases/latest)
[![GoDoc](https://godoc.org/github.com/iiinsomnia/yiigo?status.svg)](https://godoc.org/github.com/iiinsomnia/yiigo)
[![MIT license](http://img.shields.io/badge/license-MIT-brightgreen.svg)](http://opensource.org/licenses/MIT)

The best and the most wanted package for junior gophers, probably.

## Features

- Support [MySQL](https://github.com/go-sql-driver/mysql)
- Support [PostgreSQL](https://github.com/lib/pq)
- Support [MongoDB](https://github.com/mongodb/mongo-go-driver)
- Support [Redis](https://github.com/gomodule/redigo)
- Support [Zipkin](https://github.com/openzipkin/zipkin-go)
- Support [Apollo](https://github.com/philchia/agollo)
- Use [gomail](https://github.com/go-gomail/gomail) for email sending
- Use [toml](https://github.com/pelletier/go-toml) for configuration
- Use [sqlx](https://github.com/jmoiron/sqlx) for SQL executing
- Use [gorm](https://gorm.io/) for ORM operating
- Use [zap](https://github.com/uber-go/zap) for logging

## Requirements

`Go1.11+`

## Installation

```sh
go get github.com/iiinsomnia/yiigo/v4
```

## Usage

#### Config

- `yiigo.toml`

```toml
[app]
env = "dev" # dev | beta | prod
debug = true

[apollo]
app_id = "test"
cluster = "default"
address = "127.0.0.1:8080"
cache_dir = "./"

[apollo.namespace]
# 自定义配置对应的namespace

[db]

[db.default]
driver = "mysql"
dsn = "username:password@tcp(localhost:3306)/dbname?timeout=10s&charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=True&loc=Local"
# dsn = "host=localhost port=5432 user=root password=secret dbname=test connect_timeout=10 sslmode=disable" # pgsql
max_open_conns = 20
max_idle_conns = 10
conn_max_lifetime = 60 # 秒

[mongo]

[mongo.default]
dsn = "mongodb://username:password@localhost:27017"
connect_timeout = 10 # 秒
pool_size = 10
max_conn_idle_time = 60 # 秒
mode = "primary" # primary | primary_preferred | secondary | secondary_preferred | nearest

[redis]

[redis.default]
address = "127.0.0.1:6379"
password = ""
database = 0
connect_timeout = 10 # 秒
read_timeout = 10 # 秒
write_timeout = 10 # 秒
pool_size = 10
pool_limit = 20
idle_timeout = 60 # 秒
wait_timeout = 10 # 秒
prefill_parallelism = 0

[log]

[log.default]
path = "app.log"
max_size = 500
max_age = 0
max_backups = 0
compress = true

[email]
host = "smtp.exmail.qq.com"
port = 25
username = ""
password = ""
```

- config usage

```go
yiigo.Env("app.env").String()
yiigo.Env("app.debug").Bool()
```

#### Apollo

```go
type QiniuConfig struct {
*yiigo.DefaultApolloConfig
BucketName string `toml:"bucket_name"`
}

var qiniu = &QiniuConfig{DefaultApolloConfig: yiigo.NewDefaultConfig("qiniu", "qiniu")}

if err := yiigo.StartApollo(qiniu); err != nil {
log.Fatal(err)
}
```

#### MySQL

```go
// default db
yiigo.DB().Get(&User{}, "SELECT * FROM `user` WHERE `id` = ?", 1)
yiigo.Orm().First(&User{}, 1)

// other db
yiigo.DB("foo").Get(&User{}, "SELECT * FROM `user` WHERE `id` = ?", 1)
yiigo.Orm("foo").First(&User{}, 1)
```

#### MongoDB

```go
// default mongodb
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

defer cancel()

yiigo.Mongo().Database("test").Collection("numbers").InsertOne(ctx, bson.M{"name": "pi", "value": 3.14159})

// other mongodb
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

defer cancel()

yiigo.Mongo("foo").Database("test").Collection("numbers").InsertOne(ctx, bson.M{"name": "pi", "value": 3.14159})
```

#### Redis

```go
// default redis
conn, err := yiigo.Redis().Get()

if err != nil {
log.Fatal(err)
}

defer yiigo.Redis().Put(conn)

conn.Do("SET", "test_key", "hello world")

// other redis
conn, err := yiigo.Redis("foo").Get()

if err != nil {
log.Fatal(err)
}

defer yiigo.Redis("foo").Put(conn)

conn.Do("SET", "test_key", "hello world")
```

#### Zipkin

```go
reporter := yiigo.NewZipkinHTTPReporter("http://localhost:9411/api/v2/spans")

// sampler
sampler := zipkin.NewModuloSampler(1)
// endpoint
endpoint, _ := zipkin.NewEndpoint("yiigo-zipkin", "localhost")

tracer, err := yiigo.NewZipkinTracer(reporter,
zipkin.WithLocalEndpoint(endpoint),
zipkin.WithSharedSpans(false),
zipkin.WithSampler(sampler),
)

if err != nil {
log.Fatal(err)
}

client, err := tracer.HTTPClient(yiigo.WithZipkinClientOptions(zipkinHttp.ClientTrace(true)))

if err != nil {
log.Fatal(err)
}

b, err := client.Get(context.Background(), "url...",
yiigo.WithRequestHeader("Content-Type", "application/json; charset=utf-8"),
yiigo.WithRequestTimeout(5*time.Second),
)

if err != nil {
log.Fatal(err)
}

fmt.Println(string(b))
```

#### Logger

```go
// default logger
yiigo.Logger().Info("hello world")

// other logger
yiigo.Logger("foo").Info("hello world")
```

## Documentation

- [API Reference](https://godoc.org/github.com/iiinsomnia/yiigo)
- [TOML](https://github.com/toml-lang/toml)
- [Example](https://github.com/iiinsomnia/yiigo-example)

**Enjoy

[开源] gev (支持 websocket 啦): Go 实现基于 Reactor 模式的非阻塞网络库

开源程序惜朝 发表了文章 • 0 个评论 • 352 次浏览 • 2019-10-24 11:04 • 来自相关话题

https://github.com/Allenxuxu/gev[gev](https://github.com/Allenxuxu/gev) ...查看全部

https://github.com/Allenxuxu/gev

[gev](https://github.com/Allenxuxu/gev) 是一个轻量、快速、高性能的基于 Reactor 模式的非阻塞网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue。

现在它支持 WebSocket 啦!

支持定时任务,延时任务!

⬇️⬇️⬇️

## 特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持
- 支持 WebSocket
- 支持定时任务,延时任务

## 性能测试

> 测试环境 Ubuntu18.04
- gev
- gnet
- eviop
- evio
- net (标准库)

### 吞吐量测试




仓库地址: https://github.com/Allenxuxu/gev

Go语言自定义自己的SSH-Server

文章分享jjjjerk 发表了文章 • 0 个评论 • 816 次浏览 • 2019-10-22 15:31 • 来自相关话题

原文地址  ...查看全部

Golang-Docker ChromeDP浏览器模拟和截图微服务

文章分享jjjjerk 发表了文章 • 0 个评论 • 885 次浏览 • 2019-10-21 15:58 • 来自相关话题

原文地址  ...查看全部

Golang 扁平项目代码结构

技术讨论jjjjerk 发表了文章 • 0 个评论 • 372 次浏览 • 2019-10-18 15:08 • 来自相关话题

原文地址 https://mojotv ...查看全部

HTTP-Reverse-Proxy反向代理Nginx硬件指纹校验

文章分享jjjjerk 发表了文章 • 0 个评论 • 520 次浏览 • 2019-10-15 15:06 • 来自相关话题

原文地址 https://moj ...查看全部

Excelize 发布 2.0.2 版本, Go 语言 Excel 文档基础库

Go开源项目xuri 发表了文章 • 0 个评论 • 359 次浏览 • 2019-10-10 14:32 • 来自相关话题


Excelize 是 Go 语言编写的用于操作 Office Excel 文档类库,基于 ECMA-376 Office Open XML 标准。可以使用它来读取、写入由 Microsoft Excel™ 2007 及以上版本创建的 XLSX 文档。相比较其他的开源类库,Excelize 支持写入原本带有图片(表)、透视表和切片器等复杂样式的文档,还支持向 Excel 文档中插入图片与图表,并且在保存后不会丢失文档原有样式,可以应用于各类报表系统中。入选 2018 开源中国码云 Gitee 最有价值开源项目 GVP,目前已成为 Go 语言最受欢迎的 Excel 文档基础库。

开源代码

GitHub: github.com/xuri/excelize
Gitee: gitee.com/xurime/excelize
中文文档: xuri.me/excelize/zh-hans

Excelize 知名用户


2019年10月9日,社区正式发布了 2.0.2 版本,该版本包含了多项新增功能、错误修复和兼容性提升优化。下面是有关该版本更新内容的摘要,完整的更改列表可查看 change log

有关更改的摘要,请参阅 Release Notes。完整的更改列表可查看 change log

Release Notes

此版本中最显著的变化包括:

兼容性提示

升级至该版本需要您的 Go 语言版本高于 1.10。

新增功能

问题修复

  • 修复部分情况下读取批注内容文本不完整的问题,解决 issue #434
  • 修复由于内部合并单元格偏移量计算错误导致的部分情况下使用 RemoveRow() 删除行出现下标越界问题,解决 issue #437
  • 修复部分情况下数据验证下拉菜单中的公式失效问题
  • 修复在循环迭代中调用 Save() 方法保存导致的文档损坏问题,解决 issue #443
  • 提升文档内部 workbook.xml.rels 中相对路径格式解析的兼容性,解决 issue #442
  • 修复部分情况下,删除带有合并单元格的文档所导致的文件损坏问题
  • 修复部分情况下设置保护工作表属性失效的情况,解决 issue #454
  • 修复部分情况下 GetSheetName 获取工作表名称为空的问题, 解决 issue #457
  • 增加单元格内多行文本解析的支持, 相关 issue #464
  • 修复 32 位操作系统环境下数字溢出问题,相关 issue #386
  • 修复 go module 依赖版本不匹配问题, 相关 issue #466 和 issue #480
  • 修复部分情况下调用 SetSheetPrOptions() 所致的文档损坏问题,解决 issue #483

性能表现

  • 性能优化,减少读取文档时的内存开销和耗时,相关 issue #439

其他

  • 完善 SetSheetRow() 函数中的异常处理
  • 代码精简优化, 合并了下列内部函数:

将函数 workBookRelsWriterdrawingRelsWriter 合并为 relsWriter;
将函数 drawingRelsReaderworkbookRelsReaderworkSheetRelsReader 合并为 relsReader;
将函数 addDrawingRelationshipsaddSheetRelationships 合并为 addRels

GoCN每日新闻(2019-10-07)

回复

每日新闻smallfish1 发起了问题 • 1 人关注 • 0 个回复 • 9872 次浏览 • 2019-10-07 15:42 • 来自相关话题

为什么gRPC客户端不提供连接池?

技术讨论tsingson 回复了问题 • 9 人关注 • 6 个回复 • 16219 次浏览 • 2019-10-03 02:58 • 来自相关话题

条新动态, 点击查看
astaxie

astaxie 回答了问题 • 2016-10-10 18:35 • 28 个回复 不感兴趣

大家推荐哪种golang包管理方式?

赞同来自:

我们目前项目中使用的是godep,但是我最近尝试迁移到glide里面来,两个的功能都差不多,但是glide更强大一点,而且是Go1.5 vendor目录支持之后出来的,所以我还是比较推荐用这个。 这里列出来一些目前支持vendor的工具 * [manul... 显示全部 »
我们目前项目中使用的是godep,但是我最近尝试迁移到glide里面来,两个的功能都差不多,但是glide更强大一点,而且是Go1.5 vendor目录支持之后出来的,所以我还是比较推荐用这个。 这里列出来一些目前支持vendor的工具 * [manul](https://github.com/kovetskiy/manul) - Vendor packages using git submodules. * [Godep](https://github.com/tools/godep) * [Govendor](https://github.com/kardianos/govendor) * [godm](https://github.com/hectorj/godm) * [vexp](https://github.com/kr/vexp) * [gv](https://github.com/forestgiant/gv) * [gvt](https://github.com/FiloSottile/gvt) - Recursively retrieve and vendor packages. * [govend](https://github.com/govend/govend) * [Glide](https://github.com/Masterminds/glide) - Manage packages like composer, npm, bundler, or other languages. * [Vendetta](https://github.com/dpw/vendetta) * [trash](https://github.com/rancher/trash) * [gsv](https://github.com/toxeus/gsv) * [gom](https://github.com/mattn/gom)
astaxie

astaxie 回答了问题 • 2016-10-11 22:12 • 4 个回复 不感兴趣

为什么Go里面大多数的接口返回的是int类型

赞同来自:

len出来的值是检查用于计算的,如果是uint的话,那么加减一下负数就不行了 详见:https://groups.google.com/forum/#!topic/golang-nuts/jJWAAMdquwQ
len出来的值是检查用于计算的,如果是uint的话,那么加减一下负数就不行了 详见:https://groups.google.com/forum/#!topic/golang-nuts/jJWAAMdquwQ
name5566

name5566 回答了问题 • 2016-10-12 11:36 • 14 个回复 不感兴趣

golang有没有好的开源游戏框架

赞同来自:

> 使用 Leaf 已知的上线项目: > * 2014 年,某手游(棋牌)项目上线 > * 2016 年,某 H5 手游项目上线 > * 2016 年,某卡牌手游项目上线 > 正在研发项目 N 个,已知情况 N >= 4 来自:https://github.... 显示全部 »
> 使用 Leaf 已知的上线项目: > * 2014 年,某手游(棋牌)项目上线 > * 2016 年,某 H5 手游项目上线 > * 2016 年,某卡牌手游项目上线 > 正在研发项目 N 个,已知情况 N >= 4 来自:https://github.com/name5566/leaf/wiki
leoliu

leoliu 回答了问题 • 2016-10-12 13:44 • 40 个回复 不感兴趣

求一些golang的教程,书籍也可以

赞同来自:

《The Golang Programming Language》 《Golang 学习笔记》
《The Golang Programming Language》 《Golang 学习笔记》
yougg

yougg 回答了问题 • 2016-10-14 10:01 • 84 个回复 不感兴趣

大家说说看都用啥写Go

赞同来自:

# IDEA大法好 # 天灭vscode 退软保平安 # 人在做,天在看 中文乱码留祸患 # 界面卡顿天地灭 赶紧卸载保平安 # 诚心诚念IDEA好 JetBrains大法平安保 # 众生皆为IDEA来 现世险恶忘前缘 # 开源为你说真相 教你脱险莫拒绝 # ... 显示全部 »
# IDEA大法好 # 天灭vscode 退软保平安 # 人在做,天在看 中文乱码留祸患 # 界面卡顿天地灭 赶紧卸载保平安 # 诚心诚念IDEA好 JetBrains大法平安保 # 众生皆为IDEA来 现世险恶忘前缘 # 开源为你说真相 教你脱险莫拒绝 # 早日不做软粉,早日获得新生 # 上网搜索“九评纳德拉” # 有 真 相
sryan

sryan 回答了问题 • 2016-10-13 11:44 • 9 个回复 不感兴趣

golang 如何动态创建struct

赞同来自:

静态语言貌似不能直接实现 可以自己实现个map[string]func(string)interface{} 将要动态生成的结构体的函数注册上去 通过string来调用相应的函数来获取对应的结构体
静态语言貌似不能直接实现 可以自己实现个map[string]func(string)interface{} 将要动态生成的结构体的函数注册上去 通过string来调用相应的函数来获取对应的结构体
astaxie

astaxie 回答了问题 • 2016-10-13 22:04 • 14 个回复 不感兴趣

想用golang写个分布式的监控,大神给点建议

赞同来自:

这个问题很有意思,很多场景设计都会来考虑拉和推两种方案,我分别对拉和推两种的优缺点对比以下,你自己权衡一下,欢迎大家继续补充 ## 拉的方案(不写agent) 优点: - 不需要agent,不需要再部署新的程序 缺点: - 网络中断的情况下,就无法监控机器... 显示全部 »
这个问题很有意思,很多场景设计都会来考虑拉和推两种方案,我分别对拉和推两种的优缺点对比以下,你自己权衡一下,欢迎大家继续补充 ## 拉的方案(不写agent) 优点: - 不需要agent,不需要再部署新的程序 缺点: - 网络中断的情况下,就无法监控机器的信息 ## 推的方案(agent) 优点: - 本地运行,在和中控机失去网络连接的时候还是可以继续保存监控数据 缺点: - 需要部署agent,如果机器多得话将来升级也是比较麻烦 拉取和推送其实大家可以考虑,微博的follow逻辑,直播流里面也有同样的问题,很多场景都会遇到 至于说第二种方案走什么协议,这种程序我建议走tcp协议,HTTP的话相对重了一点。
sheepbao

sheepbao 回答了问题 • 2016-10-30 20:16 • 18 个回复 不感兴趣

字符串连接哪一种方式最高效

赞同来自:

```go package main import ( "bytes" "fmt" "strings" "time" ) var way map[int]string func benchmarkStringFunction(n int, ind... 显示全部 »
```go package main import ( "bytes" "fmt" "strings" "time" ) var way map[int]string func benchmarkStringFunction(n int, index int) (d time.Duration) { v := "ni shuo wo shi bu shi tai wu liao le a?" var s string var buf bytes.Buffer t0 := time.Now() for i := 0; i < n; i++ { switch index { case 0: // fmt.Sprintf s = fmt.Sprintf("%s[%s]", s, v) case 1: // string + s = s + "[" + v + "]" case 2: // strings.Join s = strings.Join([]string{s, "[", v, "]"}, "") case 3: // stable bytes.Buffer buf.WriteString("[") buf.WriteString(v) buf.WriteString("]") } } d = time.Since(t0) if index == 3 { s = buf.String() } fmt.Printf("string len: %d\t", len(s)) fmt.Printf("time of [%s]=\t %v\n", way[index], d) return d } func main() { way = make(map[int]string, 5) way[0] = "fmt.Sprintf" way[1] = "+" way[2] = "strings.Join" way[3] = "bytes.Buffer" k := 4 d := [5]time.Duration{} for i := 0; i < k; i++ { d[i] = benchmarkStringFunction(10000, i) } } ``` 结果: ``` string len: 410000 time of [fmt.Sprintf]= 426.001476ms string len: 410000 time of [+]= 307.044147ms string len: 410000 time of [strings.Join]= 738.44362ms string len: 410000 time of [bytes.Buffer]= 742.248µs ``` * strings.Join 最慢 * fmt.Sprintf 和 string + 差不多 * bytes.Buffer又比上者快约500倍
ecofast

ecofast 回答了问题 • 2018-02-25 14:00 • 8 个回复 不感兴趣

发现一个非常不错的性能优化的视频

赞同来自:

要点简单小结: 1) Don't use Go, use Assembly --> NO. 2) Don't use Go, use C/C++ --> NO. Go 1.4 的编译速度非常快,后因编译器开始自举,编译速度下降不少,但一直在改进 3) C... 显示全部 »
要点简单小结: 1) Don't use Go, use Assembly --> NO. 2) Don't use Go, use C/C++ --> NO. Go 1.4 的编译速度非常快,后因编译器开始自举,编译速度下降不少,但一直在改进 3) CGO --> 应审慎地通过 CGO 来使用经由 C/C++ 高度优化过的代码库(如加解密算法),且每次 CGO 调用约有 150ns 的额外开销 4) Go 的编译器优化水准一直在提升,通常来说,新版本的 Go 能生成更优化的机器码 5) 在 Go 中作 Benchmark 非常容易,能用它来很方便地分析 CPU 及内存分配等性能热点 6) pprof 是很实用的性能剖析工具 7) Go 的 GC 实现一直在改进,而且改进明显 8) 不应在循环里作频繁的内存分配等工作 --> 耗时且加重 GC 负担,可善用 bytes.Buffer 和 sync.Pool 等设施 9) 避免内存碎片 --> 逃逸分析对性能的影响 10) 锁本身并不慢,慢的是竞争 --> sync/atomic 在某些场合是好东西 11) hashmap 查找虽快(O(1) 的时间复杂度),但元素数量对查找速度有明显影响 --> map 的初始容量、粒度问题
九命猫

九命猫 回答了问题 • 2016-10-31 10:52 • 5 个回复 不感兴趣

go中如何连接两个slice

赞同来自:

```go append([]int{1, 2}, []int{3, 4}...) ```
```go append([]int{1, 2}, []int{3, 4}...) ```
astaxie

astaxie 回答了问题 • 2016-11-13 13:29 • 4 个回复 不感兴趣

golang vendor路径问题

赞同来自:

刚和 @viktor1992 看了一下源码,看到这个vendor的函数 ``` // vendoredImportPath returns the expansion of path when it appears in parent. // If pare... 显示全部 »
刚和 @viktor1992 看了一下源码,看到这个vendor的函数 ``` // vendoredImportPath returns the expansion of path when it appears in parent. // If parent is x/y/z, then path might expand to x/y/z/vendor/path, x/y/vendor/path, // x/vendor/path, vendor/path, or else stay path if none of those exist. // vendoredImportPath returns the expanded path or, if no expansion is found, the original. func vendoredImportPath(parent *Package, path string) (found string) { if parent == nil || parent.Root == "" { return path } ``` 这个里面有一个概念parent,我们可以看到Go官方关于vendor的文档设计的时候都有这个,也就是说你的源码要有一个包存起来,我们看一下这个实验里面的代码。 首先`a.go`放在了gopath低目录,也就是他的parent是nil,所以这个返回了`wer`,而这个包又不在gopath下面,所以第一个报错了。 第二个你创建了test目录,那么他的parent就是test,那么就会去找test/vendor/path和vendor/path下面找,所以vendor不管你放在test下面还是gopath下面都是可以找到的。
傅小黑

傅小黑 回答了问题 • 2017-06-06 17:18 • 7 个回复 不感兴趣

Go指针复制问题

赞同来自:

```go for k, r := range *rr { fmt.Printf("%dth r, id: %d, cpu: %f, mem: %f\n", k, r.ID, r.CPU, r.MEM) rs = append(rs, &r) } ``` ... 显示全部 »
```go for k, r := range *rr { fmt.Printf("%dth r, id: %d, cpu: %f, mem: %f\n", k, r.ID, r.CPU, r.MEM) rs = append(rs, &r) } ``` 这里的 r 一直是同一个地址的值的,for 循环的每次是覆盖旧的 r,你要用 *rr[k]
voidint

voidint 回答了问题 • 2017-07-25 09:20 • 7 个回复 不感兴趣

有必要设置多个gopath吗?

赞同来自:

我会设置起码2个`GOPATH`。因为`go get`会把代码拉倒第一个`GOPATH`的缘故,我会把第一个`GOPATH`用于存放第三方库,之后的`GOPATH`才是自己的项目,这样会更清晰。
我会设置起码2个`GOPATH`。因为`go get`会把代码拉倒第一个`GOPATH`的缘故,我会把第一个`GOPATH`用于存放第三方库,之后的`GOPATH`才是自己的项目,这样会更清晰。
EasyHacking

EasyHacking 回答了问题 • 2018-02-26 21:54 • 45 个回复 不感兴趣

Go 零基础编程入门教程

赞同来自:

课程持续更新中,欢迎大家持续关注我们课程。 欢迎对课程提建议,也欢迎加Q群一起学习交流:694650181
课程持续更新中,欢迎大家持续关注我们课程。 欢迎对课程提建议,也欢迎加Q群一起学习交流:694650181
chenqinghe

chenqinghe 回答了问题 • 2018-07-31 08:42 • 6 个回复 不感兴趣

忽略 Close() 的 error 是不是安全的呢?

赞同来自:

如果不想忽略可以这样: ```go func hi()(err error){ f,err:= os.Create("a.txt") if err!=nil { return err } defer func(){ if e:= f.Clos... 显示全部 »
如果不想忽略可以这样: ```go func hi()(err error){ f,err:= os.Create("a.txt") if err!=nil { return err } defer func(){ if e:= f.Close();e!=nil { err = e } }() return nil } ```

关于规范招聘信息发帖说明

招聘应聘 回复了问题 • 10 人关注 • 4 个回复 • 5661 次浏览 • 2019-10-04 10:59 • 来自相关话题

Go 零基础编程入门教程

文章分享ducklife 回复了问题 • 104 人关注 • 45 个回复 • 30395 次浏览 • 2019-08-19 12:59 • 来自相关话题

大家是如何处理 golang web 应用静态资源的?

技术讨论astaxie 回复了问题 • 5 人关注 • 1 个回复 • 4804 次浏览 • 2016-10-14 13:08 • 来自相关话题

Python 程序员的 Golang 学习指南(II): 开发环境搭建

文章分享Cloudinsight 发表了文章 • 0 个评论 • 18320 次浏览 • 2016-10-12 15:44 • 来自相关话题

Authors: startover * [startover's Github](https://github.com/startover) * [Cloudinsight](http://cloudinsight.oneapm.c ...查看全部
Authors: startover
* [startover's Github](https://github.com/startover)
* [Cloudinsight](http://cloudinsight.oneapm.com/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang2&utm_campaign=CiTech&from=jscwgyegna) 工程师

------------


[上一篇文章](https://gocn.io/article/36)我们已经对 Golang 有了初步的了解,这篇主要介绍如何在 Ubuntu 14.04 上搭建 Golang 开发环境。

## 安装 Golang

这里就按照[官方文档](https://golang.org/doc/install#install)进行安装即可,如下:

* 下载并解压安装包到指定目录

```
$ wget https://storage.googleapis.com/golang/go1.6.3.linux-amd64.tar.gz
$ tar -C /usr/local -xzf go1.6.3.linux-amd64.tar.gz
```

* 设置 PATH

```
$ echo "export PATH=$PATH:/usr/local/go/bin" >> ~/.bashrc
$ source ~/.bashrc
```

* 验证安装

```
$ go version
go version go1.6.3 linux/amd64
```

## 环境变量设置

```
$ echo "export GOROOT=/usr/local/go" >> ~/.bashrc
$ echo "export GOPATH=$HOME/go" >> ~/.bashrc
$ source ~/.bashrc
```

其中,GOROOT 为 Golang 的安装目录,只有当 Golang 安装到除 /usr/local 之外的路径时需要设置,反之则不用设置,GOPATH 是 Golang 的开发目录,详细可参考[官方文档](https://golang.org/cmd/go/#hdr-GOPATH_environment_variable)。

## 开发工具

工欲善其事,必先利其器,作为一名伪 VIMer,这里主要介绍下如何在 Vim 下配置 Golang 开发环境。

由于之前一直使用 [k-vim](https://github.com/wklken/k-vim) 作为 Python 开发环境,而 [k-vim](https://github.com/wklken/k-vim) 已经集成了当前使用最为广泛的用于搭建 Golang 开发环境的 vim 插件 [vim-go](https://github.com/fatih/vim-go),只是默认没有开启,需要我们手动进行相关设置。

在 [k-vim](https://github.com/wklken/k-vim) 中开启 Golang 语言的支持,非常简单,如下:

* 修改 ~/.vimrc.bundles(开启 golang 支持,并修改 vim-go 的默认配置,增加快捷键配置等)。

```bash
let g:bundle_groups=['python', 'javascript', 'markdown', 'html', 'css', 'tmux', 'beta', 'json', 'golang']

" vimgo {{{
let g:go_highlight_functions = 1
let g:go_highlight_methods = 1
let g:go_highlight_structs = 1
let g:go_highlight_operators = 1
let g:go_highlight_build_constraints = 1

let g:go_fmt_fail_silently = 1
let g:go_fmt_command = "goimports"
let g:syntastic_go_checkers = ['golint', 'govet', 'errcheck']

" vim-go custom mappings
au FileType go nmap s (go-implements)
au FileType go nmap i (go-info)
au FileType go nmap gd (go-doc)
au FileType go nmap gv (go-doc-vertical)
au FileType go nmap r (go-run)
au FileType go nmap b (go-build)
au FileType go nmap t (go-test)
au FileType go nmap c (go-coverage)
au FileType go nmap ds (go-def-split)
au FileType go nmap dv (go-def-vertical)
au FileType go nmap dt (go-def-tab)
au FileType go nmap e (go-rename)
au FileType go nnoremap gr :GoRun %
" }}}
```

* 在 Vim 内执行 `:PlugInstall`,安装 [vim-go](https://github.com/fatih/vim-go)。

* 在 Vim 内执行 `:GoInstallBinaries`,下载并安装 [vim-go](https://github.com/fatih/vim-go) 依赖的二进制工具,`goimports`,`golint` 等。

* 安装 [gotags](https://github.com/jstemmer/gotags),使 `tagbar` 配置生效。

```
$ go get -u github.com/jstemmer/gotags
```

我们来看一下最终效果:

![Image of Golang Environment in Vim](http://startover.github.io/images/golang-for-pythonistas-environment.png)


## 编写第一个程序

进入工作目录,新建文件 `hello.go`,如下:

```
$ cd $GOPATH
$ vim hello.go
package main

import "fmt"

func main() {
fmt.Println("Hello, World!")
}
```

运行程序:

```
$ go run hello.go
Hello, World!
```


------------
本文章为 [Cloudinsight](http://cloudinsight.oneapm.com/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang1&utm_campaign=CiTech&from=jscwgyegna) 技术团队工程师原创,更多技术文章可访问 [Cloudinsight 技术博客](http://cloudinsight.oneapm.com/blog/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang1&utm_campaign=CiTech&from=jscwgyegna)。[Cloudinsight](http://cloudinsight.oneapm.com/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang1&utm_campaign=CiTech&from=jscwgyegna) 为可视化系统监控工具,涵盖 Windows、Linux 操作系统,用 Golang 开发的 Cloudinsight Agent 正式开源了,欢迎 fork,Github:https://github.com/cloudinsight/cloudinsight-agent


golang-for-pythonistas 系列持续更新中,欢迎关注~

Python 程序员的 Golang 学习指南(I): Go 之初体验

文章分享Cloudinsight 发表了文章 • 3 个评论 • 4548 次浏览 • 2016-10-12 15:27 • 来自相关话题

Authors: startover * [startover's Github](https://github.com/startover) * [Cloudinsight](http://cloudinsight.oneapm.c ...查看全部
Authors: startover
* [startover's Github](https://github.com/startover)
* [Cloudinsight](http://cloudinsight.oneapm.com/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang1&utm_campaign=CiTech&from=jscwgyegna) 工程师

------------



## Go 语言简介

Go,又称 golang,是 Google 开发的一种静态强类型,编译型,并发型,并具有垃圾回收功能的编程语言。

Go 语言于2009年11月正式宣布推出,自2012年发布1.0,最新稳定版1.7。目前,Go的相关工具和生态已逐渐趋于完善,也不乏重量级项目,如 Docker, Kubernetes, Etcd, InfluxDB 等。

## Go 语言能解决什么样的问题

同绝大多数通用型编程语言相比,Go 语言更多的是为了解决我们在构建大型服务器软件过程中所遇到的软件工程方面的问题而设计的。乍看上去,这么讲可能会让人感觉 Go 非常无趣且工业化,但实际上,在设计过程中就着重于清晰和简洁,以及较高的可组合性,最后得到的反而会是一门使用起来效率高而且很有趣的编程语言,很多程序员都会发现,它有极强的表达力而且功能非常强大。

总结为以下几点:

* 清晰的依赖关系
* 清晰的语法
* 清晰的语义
* 偏向组合而不是继承
* 提供简单的编程模型(垃圾回收、并发)
* 强大的内置工具(gofmt、godoc、gofix等)

建议有兴趣的同学看看 [Go在谷歌:以软件工程为目的的语言设计](http://www.oschina.net/translate/go-at-google-language-design-in-the-service-of-software-engineering)。

## Go 语言相对 Python 有哪些优势

这里引用一段[知乎](https://www.zhihu.com/question/21409296)上某大牛的回答,如下:

* **部署简单**。Go 编译生成的是一个静态可执行文件,除了 glibc 外没有其他外部依赖。这让部署变得异常方便:目标机器上只需要一个基础的系统和必要的管理、监控工具,完全不需要操心应用所需的各种包、库的依赖关系,大大减轻了维护的负担。这和 Python 有着巨大的区别。由于历史的原因,Python 的部署工具生态相当混乱【比如 setuptools, distutils, pip, buildout 的不同适用场合以及兼容性问题】。官方 PyPI 源又经常出问题,需要搭建私有镜像,而维护这个镜像又要花费不少时间和精力。

* **并发性好**。Goroutine 和 channel 使得编写高并发的服务端软件变得相当容易,很多情况下完全不需要考虑锁机制以及由此带来的各种问题。单个 Go 应用也能有效的利用多个 CPU 核,并行执行的性能好。这和 Python 也是天壤之比。多线程和多进程的服务端程序编写起来并不简单,而且由于全局锁 GIL 的原因,多线程的 Python 程序并不能有效利用多核,只能用多进程的方式部署;如果用标准库里的 multiprocessing 包又会对监控和管理造成不少的挑战【我们用的 supervisor 管理进程,对 fork 支持不好】。部署 Python 应用的时候通常是每个 CPU 核部署一个应用,这会造成不少资源的浪费,比如假设某个 Python 应用启动后需要占用 100MB 内存,而服务器有 32 个 CPU 核,那么留一个核给系统、运行 31 个应用副本就要浪费 3GB 的内存资源。

* **良好的语言设计**。从学术的角度讲 Go 语言其实非常平庸,不支持许多高级的语言特性;但从工程的角度讲,Go 的设计是非常优秀的:规范足够简单灵活,有其他语言基础的程序员都能迅速上手。更重要的是 Go 自带完善的工具链,大大提高了团队协作的一致性。比如 gofmt 自动排版 Go 代码,很大程度上杜绝了不同人写的代码排版风格不一致的问题。把编辑器配置成在编辑存档的时候自动运行 gofmt,这样在编写代码的时候可以随意摆放位置,存档的时候自动变成正确排版的代码。此外还有 gofix, govet 等非常有用的工具。

* **执行性能好**。虽然不如 C 和 Java,但通常比原生 Python 应用还是高一个数量级的,适合编写一些瓶颈业务。内存占用也非常省。

从个人对 Golang 的初步使用来说,体验还是相当不错的,但是也有下面几点需要注意:

* 驼峰式命名风格(依据首字母大小写来决定其是否能被其他包引用),但我更喜欢 Python 的小写字母加下划线命名风格。

* 没有好用的包管理器,Golang 官方也没有推荐最佳的包管理方案,目前公认的比较好用的有 Godeps, Govendor 及 Glide,而 Python 的包管理器 pip 已形成自己的一套标准。

* 多行字符串的变量声明需要用反引号(`),Python 里是三个双引号("""),参考[http://stackoverflow.com/questions/7933460/how-do-you-write-multiline-strings-in-go](http://stackoverflow.com/questions/7933460/how-do-you-write-multiline-strings-in-go)

* Golang 中的类型匹配是很严格的,不同的类型之间通常需要手动转换,所以在字符串拼接时往往需要对整型进行显式转换,如 `fmt.Println("num: " + strconv.Itoa(1))`

* Golang 语言语法里的语法糖并不多,如在 Python 中很流行的 map, reduce, range 等,在 Golang 里都没有得到支持。

另外,推荐阅读 [Golang 新手开发者要注意的陷阱和常见错误](http://devs.cloudimmunity.com/gotchas-and-common-mistakes-in-go-golang/)。

## 学习资料推荐

建议先把 Go 的[官方文档](https://golang.org/doc/)过一遍,主要有以下几项:

* [A Tour of Go](https://tour.golang.org/welcome/1)
* [How to write Go code](https://golang.org/doc/code.html)
* [Effective Go](https://golang.org/doc/effective_go.html)
* [Language Specification](https://golang.org/ref/spec)

官方文档看完后,基本也算入门了,这时候可以看看 [Go 的示例代码](https://gobyexample.com/),或者去 [Project Euler](https://projecteuler.net/) 刷刷题。

当然也可以去知乎看看大牛们都是如何学习的,链接 [https://www.zhihu.com/question/23486344](https://www.zhihu.com/question/23486344)。

## 总结

虽然 Go 有很多被诟病的地方,比如 GC 和对错误的处理方式,但没有任何语言是完美的,从实用角度来讲,Go 有着不输于 Python 的开发效率,完善的第三方工具,以及强大的社区支持,这些就足够了。



相关链接:
[https://golang.org/doc/](https://golang.org/doc/)
[https://talks.golang.org/2012/splash.article](https://talks.golang.org/2012/splash.article)
[https://www.zhihu.com/question/21409296](https://www.zhihu.com/question/21409296)
[https://www.zhihu.com/question/23486344](https://www.zhihu.com/question/23486344)
[http://stackoverflow.com/questions/7933460/how-do-you-write-multiline-strings-in-go](http://stackoverflow.com/questions/7933460/how-do-you-write-multiline-strings-in-go)
[http://devs.cloudimmunity.com/gotchas-and-common-mistakes-in-go-golang/](http://devs.cloudimmunity.com/gotchas-and-common-mistakes-in-go-golang/)
[http://www.oschina.net/translate/go-at-google-language-design-in-the-service-of-software-engineering](http://www.oschina.net/translate/go-at-google-language-design-in-the-service-of-software-engineering)


------------



本文章为 [Cloudinsight](http://cloudinsight.oneapm.com/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang1&utm_campaign=CiTech&from=jscwgyegna) 技术团队工程师原创,更多技术文章可访问 [Cloudinsight 技术博客](http://cloudinsight.oneapm.com/blog/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang1&utm_campaign=CiTech&from=jscwgyegna)。[Cloudinsight](http://cloudinsight.oneapm.com/?utm_source=CiTechBlog&utm_medium=gocn&utm_term=pythongolang1&utm_campaign=CiTech&from=jscwgyegna) 为可视化系统监控工具,涵盖 Windows、Linux 操作系统,用 Golang 开发的 Cloudinsight Agent 正式开源了,欢迎 fork,Github:https://github.com/cloudinsight/cloudinsight-agent


golang-for-pythonistas 系列持续更新中,欢迎关注~

求一些golang的教程,书籍也可以

有问必答lesliehuang 回复了问题 • 73 人关注 • 40 个回复 • 14826 次浏览 • 2019-08-13 19:06 • 来自相关话题

golang有没有好的开源游戏框架

技术讨论ducklife 回复了问题 • 34 人关注 • 14 个回复 • 29947 次浏览 • 2019-08-19 12:59 • 来自相关话题

为什么Go里面大多数的接口返回的是int类型

有问必答ducklife 回复了问题 • 5 人关注 • 4 个回复 • 5541 次浏览 • 2019-08-19 13:00 • 来自相关话题

大家推荐哪种golang包管理方式?

有问必答asdfsx 回复了问题 • 38 人关注 • 28 个回复 • 37907 次浏览 • 2018-03-05 18:50 • 来自相关话题

关于规范招聘信息发帖说明

回复

招聘应聘 回复了问题 • 10 人关注 • 4 个回复 • 5661 次浏览 • 2019-10-04 10:59 • 来自相关话题

bench fasthttp 和 库自带的net.http性能,fasthttp完败?

回复

技术讨论tsingson 回复了问题 • 3 人关注 • 4 个回复 • 57 次浏览 • 2 小时前 • 来自相关话题

driver: bad connection 问题

回复

有问必答treedom 回复了问题 • 1 人关注 • 1 个回复 • 127 次浏览 • 2019-11-08 09:20 • 来自相关话题

GoCN每日新闻(2019-10-07)

回复

每日新闻smallfish1 发起了问题 • 1 人关注 • 0 个回复 • 9872 次浏览 • 2019-10-07 15:42 • 来自相关话题

为什么gRPC客户端不提供连接池?

回复

技术讨论tsingson 回复了问题 • 9 人关注 • 6 个回复 • 16219 次浏览 • 2019-10-03 02:58 • 来自相关话题

花了半天用go写一个带ui的redis客户端

回复

Go开源项目ownGolang 回复了问题 • 6 人关注 • 5 个回复 • 1395 次浏览 • 2019-09-18 10:17 • 来自相关话题

GoCN每日新闻(2019-09-10)

回复

每日新闻smallfish1 发起了问题 • 1 人关注 • 0 个回复 • 13813 次浏览 • 2019-09-10 11:30 • 来自相关话题

http接口调试

回复

有问必答tsingson 回复了问题 • 7 人关注 • 8 个回复 • 846 次浏览 • 2019-08-19 13:50 • 来自相关话题

为什么Go里面大多数的接口返回的是int类型

回复

有问必答ducklife 回复了问题 • 5 人关注 • 4 个回复 • 5541 次浏览 • 2019-08-19 13:00 • 来自相关话题

Go 零基础编程入门教程

回复

文章分享ducklife 回复了问题 • 104 人关注 • 45 个回复 • 30395 次浏览 • 2019-08-19 12:59 • 来自相关话题

golang有没有好的开源游戏框架

回复

技术讨论ducklife 回复了问题 • 34 人关注 • 14 个回复 • 29947 次浏览 • 2019-08-19 12:59 • 来自相关话题

在群里看到一段代码,是内存模型的问题还是协程调度的问题呢?

回复

有问必答h12 回复了问题 • 5 人关注 • 4 个回复 • 587 次浏览 • 2019-08-13 20:01 • 来自相关话题

求一些golang的教程,书籍也可以

回复

有问必答lesliehuang 回复了问题 • 73 人关注 • 40 个回复 • 14826 次浏览 • 2019-08-13 19:06 • 来自相关话题

golang开发

回复

招聘应聘君莫笑 发起了问题 • 1 人关注 • 0 个回复 • 457 次浏览 • 2019-08-01 11:20 • 来自相关话题

golang的错误处理机制是什么???

回复

有问必答dncmn 发起了问题 • 1 人关注 • 0 个回复 • 362 次浏览 • 2019-07-18 17:18 • 来自相关话题

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

文章分享panjf2000 发表了文章 • 0 个评论 • 311 次浏览 • 4 天前 • 来自相关话题

原文Go netpoll I/O 多路复用构建原生网络模型之源码深度解析 ...查看全部

原文

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

导言

Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于 Go 的I/O 多路复用 netpoll),提供了 goroutine-per-connection 这样简单的网络编程模式。在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。

然而,在工程性上能做到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于非常复杂的封装,做了很多取舍,也有可能放弃了一些『极致』的设计和理念。事实上netpoll底层就是基于 epoll/kqueue/iocp 这些系统调用来做封装的,最终暴露出 goroutine-per-connection 这样的极简的开发模式给使用者。

Go netpoll 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

本文将基于 linux 平台来解析 Go netpoll 之 I/O 多路复用的底层是如何基于 epoll 封装实现的,从源码层层推进,全面而深度地解析 Go netpoll 的设计理念和实现原理,以及 Go 是如何利用netpoll来构建它的原生网络模型的。主要涉及到的一些概念:I/O 模式、用户/内核空间、epoll、linux 源码、goroutine scheduler 等等,我会尽量简单地讲解,如果有对相关概念不熟悉的同学,还是希望能提前熟悉一下。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。

I/O 多路复用

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路复用 (I/O multiplexing)
  • 信号驱动 I/O (Signal driven I/O)
  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select & poll

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select 是 epoll 之前 linux 使用的 I/O 事件驱动技术。

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第5位置为1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

epoll 是 linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的等待消息准备好I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 预估错误了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的:返回的活跃连接 == select(全部待监控的连接)

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被高频二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

#include <sys/epoll.h>  
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是不太频繁被调用的,而 epoll_wait 则是非常频繁被调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 都会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为:ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdlist 双向链表中是否有就绪的 fd,当 rdlist 为空(无就绪fd)时挂起当前进程,直到 rdlist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

延伸

另外,我看到有些文章说 epoll 之所以性能高是因为利用了 linux 的 mmap 内存映射让内核和用户进程共享了一片物理内存,用来存放就绪 fd 列表和它们的数据 buffer,所以用户进程在 epoll_wait返回之后用户进程就可以直接从共享内存那里读取/写入数据了,这让我很疑惑,因为首先看epoll_wait的函数声明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二个参数:就绪事件列表,是需要在用户空间分配内存然后再传给epoll_wait的,如果内核会用 mmap 设置共享内存,直接传递一个指针进去就行了,根本不需要在用户态分配内存,多此一举。其次,内核和用户进程通过 mmap 共享内存是一件极度危险的事情,内核无法确定这块共享内存什么时候会被回收,而且这样也会赋予用户进程直接操作内核数据的权限和入口,非常容易出现大的系统漏洞,因此一般极少会这么做。所以我很怀疑 epoll 是不是真的在 linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源码:

/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
// ...

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
// ...

send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/


// 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
// 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
// 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
// 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

// ...
}

// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;

esed.maxevents = maxevents;
esed.events = events;
// 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
// 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}

// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
// ...

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/

revents = ep_item_poll(epi, &pt, 1);
// 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
if (!revents)
continue;
// 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
// 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}

// ...
}

do_epoll_wait开始层层跳转,我们可以很清楚地看到最后内核是通过__put_user函数把就绪 fd 列表和事件返回到用户空间,而__put_user正是内核用来拷贝数据到用户空间的标准函数。此外,我并没有在 linux kernel 的源码中和 epoll 相关的代码里找到 mmap 系统调用做内存映射的逻辑,所以基本可以得出结论:epoll 在 linux kernel 里并没有使用 mmap 来做用户空间和内核空间的内存共享,所以那些说 epoll 使用了 mmap 的文章都是误解。

Non-blocking I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK标志位,从而把 socket 设置成 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

Go netpoll

一个典型的 Go TCP server:

package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}

// start a new goroutine to handle the new connection
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// 如果没有可读数据,也就是读 buffer 为空,则阻塞
_, _ = conn.Read(packet)
// 同理,不可写则阻塞
_, _ = conn.Write(packet)
}
}

上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!

Go netpoll 核心

Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。

上面的示例代码中相关的在源码里的几个数据结构和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

// Conn
type conn struct {
fd *netFD
}

type conn struct {
fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

netFD

net.Listen("tcp", ":8888") 方法返回了一个 TCPListener,它是一个实现了 net.Listener 接口的 struct,而通过 listen.Accept() 接收的新连接 TCPConn 则是一个实现了 net.Conn 接口的 struct,它内嵌了 net.conn struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD 的数据结构的操作,netFD 是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。

netFDpoll.FD的源码:

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

pollDesc

前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:

type pollDesc struct {
runtimeCtx uintptr
}

这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在runtime.pollDesc这里:

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock

// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}

runtime.pollDesc包含自身类型的一个指针,用来保存下一个runtime.pollDesc的地址,以此来实现链表,可以减少数据结构的大小,所有的runtime.pollDesc保存在runtime.pollCache结构中,定义如下:

type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}

net.Listen

调用 net.Listen之后,底层会通过 Linux 的系统调用socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD,接着调用 netFD 的listenStream方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:

// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc func(int, int, int) (int, error) = syscall.Socket

// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...

// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}

// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}

// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once

// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)

// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}

// netpollopen 会被 runtime_pollOpen,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:

#include <sys/epoll.h>  
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服务端的 netFD 在listen时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列
  2. netFD 在accept时将返回的 connFD 也加入 epoll 的事件队列
  3. netFD 在读写时出现syscall.EAGAIN错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead中返回

Listener.Accept()接收来自客户端的新连接,具体还是调用netFD.accept方法来完成这个功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

netFD.accept方法里再调用poll.FD.Accept,最后会使用 linux 的系统调用accept来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。

Conn.Read/Conn.Write

我们先来看看Conn.Read方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链还是首先调用 conn 的netFD.Read,然后内部再调用 poll.FD.Read,最后使用 linux 的系统调用 read: syscall.Read完成数据读取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}

conn.Writeconn.Read的原理是一致的,它也是通过类似 pollDesc.waitReadpollDesc.waitWrite来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite的内部实现原理和pollDesc.waitRead是一样的,都是基于runtime_pollWait,这里就不再赘述。

pollDesc.waitRead

pollDesc.waitRead内部调用了 runtime_pollWait来达成无 I/O 事件时 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
// 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}

// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg

// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,然后重新调度运行该 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}

netpoll

前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由_Grunning_Gwaitting),因此G必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。

所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait,在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(block bool) gList 方法,它会内部调用epoll_wait获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
// 是否以阻塞模式调用 epoll_wait
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
// 判断发生的事件类型,读类型或者写类型
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}

// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}

// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintpt

gochat - 纯go实现的im即时通讯系统(支持水平扩展)

开源程序gochat 发表了文章 • 0 个评论 • 466 次浏览 • 2019-11-04 15:56 • 来自相关话题

项目推荐- 项目地址:https://github.com/LockG ...查看全部

项目推荐
- 项目地址:
https://github.com/LockGit/gochat

- 类别:Go

- 项目描述:

gochat为纯go实现的即时通讯系统,支持私信消息与房间广播消息,各层之间通过rpc通讯,支持水平扩展。
使用redis作为消息存储与投递的载体,相对kafka操作起来更加方便快捷,所以十分轻量。
各层之间基于etcd服务发现,在扩容部署时将会方便很多。
由于go的交叉编译特性,编译后可以快速在各个平台上运行,gochat架构及目录结构清晰,
并且本项目还贴心的提供了docker一键构建所有环境依赖,安装起来十分便捷。

- 推荐理由:
轻量快捷不臃肿,水平可扩展,docker快速构建所有环境,迅速体验im即时通讯,
各层架构清晰,文档说明详细。

系统架构:



服务发现:


消息投递


聊天室预览:

[ gev ] Go 语言优雅处理 TCP “粘包”

文章分享惜朝 发表了文章 • 0 个评论 • 621 次浏览 • 2019-11-01 10:48 • 来自相关话题

https://github.com/Allenxuxu/gev ...查看全部

https://github.com/Allenxuxu/gev

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

TCP 为什么会粘包


TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送(Nagle算法)。

gev 如何优雅处理


gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区(ringbuffer)通过参数传递过来。

用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。



做法如下,定义一个接口 Protocol

```go
// Protocol 自定义协议编解码接口
type Protocol interface {
UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
Packet(c *Connection, data []byte) []byte
}
```


用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

下面,我们实现一个简单的自定义协议插件,来启动一个 Server :

```text
| 数据长度 n | payload |
| 4字节 | n 字节 |
```

```go
// protocol.go
package main

import (
"encoding/binary"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
if buffer.VirtualLength() > exampleHeaderLen {
buf := pbytes.GetLen(exampleHeaderLen)
defer pbytes.Put(buf)
_, _ = buffer.VirtualRead(buf)
dataLen := binary.BigEndian.Uint32(buf)

if buffer.VirtualLength() >= int(dataLen) {
ret := make([]byte, dataLen)
_, _ = buffer.VirtualRead(ret)

buffer.VirtualFlush()
return nil, ret
} else {
buffer.VirtualRevert()
}
}
return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
dataLen := len(data)
ret := make([]byte, exampleHeaderLen+dataLen)
binary.BigEndian.PutUint32(ret, uint32(dataLen))
copy(ret[4:], data)
return ret
}
```

```go
// server.go
package main

import (
"flag"
"log"
"strconv"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Println("OnMessage:", data)
out = data
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)
var port int
var loops int

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

s, err := gev.NewServer(handler,
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.Protocol(&ExampleProtocol{}))
if err != nil {
panic(err)
}

log.Println("server start")
s.Start()
}
```
完整代码地址

当回调 `OnMessage` 函数的时候,会通过参数传递已经拆好包的用户数据。

当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 `gev.NewServer` 时指定即可:

```go
gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
```

## 基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件(WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

```go
package websocket

import (
"log"

"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
upgrade *ws.Upgrader
}

// New 创建 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 协议,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
upgraded := c.Context()
if upgraded == nil {
var err error
out, _, err = p.upgrade.Upgrade(buffer)
if err != nil {
log.Println("Websocket Upgrade :", err)
return
}
c.SetContext(true)
} else {
header, err := ws.VirtualReadHeader(buffer)
if err != nil {
log.Println(err)
return
}
if buffer.VirtualLength() >= int(header.Length) {
buffer.VirtualFlush()

payload := make([]byte, int(header.Length))
_, _ = buffer.Read(payload)

if header.Masked {
ws.Cipher(payload, header.Mask, 0)
}

ctx = &header
out = payload
} else {
buffer.VirtualRevert()
}
}
return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
return data
}
```

具体的实现,可以到仓库的 [plugins/websocket](https://github.com/Allenxuxu/gev/tree/master/plugins/websocket) 查看。

## 相关文章

- [开源 gev: Go 实现基于 Reactor 模式的非阻塞 TCP 网络库](https://note.mogutou.xyz/articles/2019/09/19/1568896693634.html)
- [Go 网络库并发吞吐量测试](https://note.mogutou.xyz/articles/2019/09/22/1569146969662.html)

## 项目地址

https://github.com/Allenxuxu/gev

yiigo 4.x 版本发布

开源程序IIInsomnia 发表了文章 • 0 个评论 • 340 次浏览 • 2019-10-25 11:50 • 来自相关话题

# yiigo[![golang](https://img.shields.io/badge/Language-Go-green.svg?style=flat)](https://golang.org)[![GitHub rele ...查看全部

# yiigo

[![golang](https://img.shields.io/badge/Language-Go-green.svg?style=flat)](https://golang.org)
[![GitHub release](https://img.shields.io/github/release/IIInsomnia/yiigo.svg)](https://github.com/iiinsomnia/yiigo/releases/latest)
[![GoDoc](https://godoc.org/github.com/iiinsomnia/yiigo?status.svg)](https://godoc.org/github.com/iiinsomnia/yiigo)
[![MIT license](http://img.shields.io/badge/license-MIT-brightgreen.svg)](http://opensource.org/licenses/MIT)

The best and the most wanted package for junior gophers, probably.

## Features

- Support [MySQL](https://github.com/go-sql-driver/mysql)
- Support [PostgreSQL](https://github.com/lib/pq)
- Support [MongoDB](https://github.com/mongodb/mongo-go-driver)
- Support [Redis](https://github.com/gomodule/redigo)
- Support [Zipkin](https://github.com/openzipkin/zipkin-go)
- Support [Apollo](https://github.com/philchia/agollo)
- Use [gomail](https://github.com/go-gomail/gomail) for email sending
- Use [toml](https://github.com/pelletier/go-toml) for configuration
- Use [sqlx](https://github.com/jmoiron/sqlx) for SQL executing
- Use [gorm](https://gorm.io/) for ORM operating
- Use [zap](https://github.com/uber-go/zap) for logging

## Requirements

`Go1.11+`

## Installation

```sh
go get github.com/iiinsomnia/yiigo/v4
```

## Usage

#### Config

- `yiigo.toml`

```toml
[app]
env = "dev" # dev | beta | prod
debug = true

[apollo]
app_id = "test"
cluster = "default"
address = "127.0.0.1:8080"
cache_dir = "./"

[apollo.namespace]
# 自定义配置对应的namespace

[db]

[db.default]
driver = "mysql"
dsn = "username:password@tcp(localhost:3306)/dbname?timeout=10s&charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=True&loc=Local"
# dsn = "host=localhost port=5432 user=root password=secret dbname=test connect_timeout=10 sslmode=disable" # pgsql
max_open_conns = 20
max_idle_conns = 10
conn_max_lifetime = 60 # 秒

[mongo]

[mongo.default]
dsn = "mongodb://username:password@localhost:27017"
connect_timeout = 10 # 秒
pool_size = 10
max_conn_idle_time = 60 # 秒
mode = "primary" # primary | primary_preferred | secondary | secondary_preferred | nearest

[redis]

[redis.default]
address = "127.0.0.1:6379"
password = ""
database = 0
connect_timeout = 10 # 秒
read_timeout = 10 # 秒
write_timeout = 10 # 秒
pool_size = 10
pool_limit = 20
idle_timeout = 60 # 秒
wait_timeout = 10 # 秒
prefill_parallelism = 0

[log]

[log.default]
path = "app.log"
max_size = 500
max_age = 0
max_backups = 0
compress = true

[email]
host = "smtp.exmail.qq.com"
port = 25
username = ""
password = ""
```

- config usage

```go
yiigo.Env("app.env").String()
yiigo.Env("app.debug").Bool()
```

#### Apollo

```go
type QiniuConfig struct {
*yiigo.DefaultApolloConfig
BucketName string `toml:"bucket_name"`
}

var qiniu = &QiniuConfig{DefaultApolloConfig: yiigo.NewDefaultConfig("qiniu", "qiniu")}

if err := yiigo.StartApollo(qiniu); err != nil {
log.Fatal(err)
}
```

#### MySQL

```go
// default db
yiigo.DB().Get(&User{}, "SELECT * FROM `user` WHERE `id` = ?", 1)
yiigo.Orm().First(&User{}, 1)

// other db
yiigo.DB("foo").Get(&User{}, "SELECT * FROM `user` WHERE `id` = ?", 1)
yiigo.Orm("foo").First(&User{}, 1)
```

#### MongoDB

```go
// default mongodb
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

defer cancel()

yiigo.Mongo().Database("test").Collection("numbers").InsertOne(ctx, bson.M{"name": "pi", "value": 3.14159})

// other mongodb
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

defer cancel()

yiigo.Mongo("foo").Database("test").Collection("numbers").InsertOne(ctx, bson.M{"name": "pi", "value": 3.14159})
```

#### Redis

```go
// default redis
conn, err := yiigo.Redis().Get()

if err != nil {
log.Fatal(err)
}

defer yiigo.Redis().Put(conn)

conn.Do("SET", "test_key", "hello world")

// other redis
conn, err := yiigo.Redis("foo").Get()

if err != nil {
log.Fatal(err)
}

defer yiigo.Redis("foo").Put(conn)

conn.Do("SET", "test_key", "hello world")
```

#### Zipkin

```go
reporter := yiigo.NewZipkinHTTPReporter("http://localhost:9411/api/v2/spans")

// sampler
sampler := zipkin.NewModuloSampler(1)
// endpoint
endpoint, _ := zipkin.NewEndpoint("yiigo-zipkin", "localhost")

tracer, err := yiigo.NewZipkinTracer(reporter,
zipkin.WithLocalEndpoint(endpoint),
zipkin.WithSharedSpans(false),
zipkin.WithSampler(sampler),
)

if err != nil {
log.Fatal(err)
}

client, err := tracer.HTTPClient(yiigo.WithZipkinClientOptions(zipkinHttp.ClientTrace(true)))

if err != nil {
log.Fatal(err)
}

b, err := client.Get(context.Background(), "url...",
yiigo.WithRequestHeader("Content-Type", "application/json; charset=utf-8"),
yiigo.WithRequestTimeout(5*time.Second),
)

if err != nil {
log.Fatal(err)
}

fmt.Println(string(b))
```

#### Logger

```go
// default logger
yiigo.Logger().Info("hello world")

// other logger
yiigo.Logger("foo").Info("hello world")
```

## Documentation

- [API Reference](https://godoc.org/github.com/iiinsomnia/yiigo)
- [TOML](https://github.com/toml-lang/toml)
- [Example](https://github.com/iiinsomnia/yiigo-example)

**Enjoy

[开源] gev (支持 websocket 啦): Go 实现基于 Reactor 模式的非阻塞网络库

开源程序惜朝 发表了文章 • 0 个评论 • 352 次浏览 • 2019-10-24 11:04 • 来自相关话题

https://github.com/Allenxuxu/gev[gev](https://github.com/Allenxuxu/gev) ...查看全部

https://github.com/Allenxuxu/gev

[gev](https://github.com/Allenxuxu/gev) 是一个轻量、快速、高性能的基于 Reactor 模式的非阻塞网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue。

现在它支持 WebSocket 啦!

支持定时任务,延时任务!

⬇️⬇️⬇️

## 特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持
- 支持 WebSocket
- 支持定时任务,延时任务

## 性能测试

> 测试环境 Ubuntu18.04
- gev
- gnet
- eviop
- evio
- net (标准库)

### 吞吐量测试




仓库地址: https://github.com/Allenxuxu/gev

Go语言自定义自己的SSH-Server

文章分享jjjjerk 发表了文章 • 0 个评论 • 816 次浏览 • 2019-10-22 15:31 • 来自相关话题

原文地址  ...查看全部

Golang-Docker ChromeDP浏览器模拟和截图微服务

文章分享jjjjerk 发表了文章 • 0 个评论 • 885 次浏览 • 2019-10-21 15:58 • 来自相关话题

原文地址  ...查看全部

Golang 扁平项目代码结构

技术讨论jjjjerk 发表了文章 • 0 个评论 • 372 次浏览 • 2019-10-18 15:08 • 来自相关话题

原文地址 https://mojotv ...查看全部

HTTP-Reverse-Proxy反向代理Nginx硬件指纹校验

文章分享jjjjerk 发表了文章 • 0 个评论 • 520 次浏览 • 2019-10-15 15:06 • 来自相关话题

原文地址 https://moj ...查看全部

Excelize 发布 2.0.2 版本, Go 语言 Excel 文档基础库

Go开源项目xuri 发表了文章 • 0 个评论 • 359 次浏览 • 2019-10-10 14:32 • 来自相关话题


Excelize 是 Go 语言编写的用于操作 Office Excel 文档类库,基于 ECMA-376 Office Open XML 标准。可以使用它来读取、写入由 Microsoft Excel™ 2007 及以上版本创建的 XLSX 文档。相比较其他的开源类库,Excelize 支持写入原本带有图片(表)、透视表和切片器等复杂样式的文档,还支持向 Excel 文档中插入图片与图表,并且在保存后不会丢失文档原有样式,可以应用于各类报表系统中。入选 2018 开源中国码云 Gitee 最有价值开源项目 GVP,目前已成为 Go 语言最受欢迎的 Excel 文档基础库。

开源代码

GitHub: github.com/xuri/excelize
Gitee: gitee.com/xurime/excelize
中文文档: xuri.me/excelize/zh-hans

Excelize 知名用户


2019年10月9日,社区正式发布了 2.0.2 版本,该版本包含了多项新增功能、错误修复和兼容性提升优化。下面是有关该版本更新内容的摘要,完整的更改列表可查看 change log

有关更改的摘要,请参阅 Release Notes。完整的更改列表可查看 change log

Release Notes

此版本中最显著的变化包括:

兼容性提示

升级至该版本需要您的 Go 语言版本高于 1.10。

新增功能

问题修复

  • 修复部分情况下读取批注内容文本不完整的问题,解决 issue #434
  • 修复由于内部合并单元格偏移量计算错误导致的部分情况下使用 RemoveRow() 删除行出现下标越界问题,解决 issue #437
  • 修复部分情况下数据验证下拉菜单中的公式失效问题
  • 修复在循环迭代中调用 Save() 方法保存导致的文档损坏问题,解决 issue #443
  • 提升文档内部 workbook.xml.rels 中相对路径格式解析的兼容性,解决 issue #442
  • 修复部分情况下,删除带有合并单元格的文档所导致的文件损坏问题
  • 修复部分情况下设置保护工作表属性失效的情况,解决 issue #454
  • 修复部分情况下 GetSheetName 获取工作表名称为空的问题, 解决 issue #457
  • 增加单元格内多行文本解析的支持, 相关 issue #464
  • 修复 32 位操作系统环境下数字溢出问题,相关 issue #386
  • 修复 go module 依赖版本不匹配问题, 相关 issue #466 和 issue #480
  • 修复部分情况下调用 SetSheetPrOptions() 所致的文档损坏问题,解决 issue #483

性能表现

  • 性能优化,减少读取文档时的内存开销和耗时,相关 issue #439

其他

  • 完善 SetSheetRow() 函数中的异常处理
  • 代码精简优化, 合并了下列内部函数:

将函数 workBookRelsWriterdrawingRelsWriter 合并为 relsWriter;
将函数 drawingRelsReaderworkbookRelsReaderworkSheetRelsReader 合并为 relsReader;
将函数 addDrawingRelationshipsaddSheetRelationships 合并为 addRels

【杭州】河象网络科技 招聘 Golang 工程师 5 名

招聘应聘airylinus 发表了文章 • 0 个评论 • 460 次浏览 • 2019-09-29 21:02 • 来自相关话题

我们是谁?我们做什么?       杭州河象网络科技有限公司,成立于 2017 年 5 月,以“为孩 ...查看全部

我们是谁?我们做什么?


       杭州河象网络科技有限公司,成立于 2017 年 5 月,以“为孩子提供有效、有趣的普惠教育”为使命,是一家专注3-12岁少儿素质教育的在线教育公司。“河小象” 是公司旗下品牌,采用“AI+教育”模式,融合图像识别、语音测评、智能匹配、人机交互及大数据分析等创新科技,自主研发课程。以“大语文综合素养”为核心,同时推出了写字、书法、美术等明星产品,致力于帮助孩子提高综合素养。

       公司拥有浙江大学、北京师范大学等知名院校教授、博士组成的教研团队,并由原阿里巴巴顶级技术团队负责研发,回归教育本质,持续不断地升级学习体验。服务超过两百万学员,覆盖北京、上海、广州、杭州等270余个城市。同时,河小象已完成 2 亿元人民币 B 轮融资,由创新工场、贝塔斯曼、好未来及上一轮投资方元璟、亦联、金沙江、志拙资本等投资。

我们提供的什么待遇和福利


- 公司提供五险一金,签署劳动合同,办理社保。
- 办公地点在 A 级写字楼:余杭区未来科技城创鑫时代广场。
- 配备办公桌、饮水间,技术开发配备 Macbook 作为办公设备。
- 享受国家法定节假日,中秋、端午会有礼品福利。
- 在招的 Golang 开发工程师岗位的薪资在 10000 - 25000,若技术水平超过岗位标准可另议。

我们对候选人的要求


- **1** 年以上 Golang 开发经验,熟悉语言特性,基础扎实。
- 有良好的编码风格,能进行测试驱动开发、集成测试,能主动改进开发过程和提高交付质量。
- 熟悉常用关系数据库( MySQL,PostgreSQL )的使用经验,能有意识的改进查询性能。
- 熟悉 Linux 操作系统,熟练使用 Docker 相关技术,了解常用运维管理操作。
- 有良好的自学能力,对新技术能持续学习、保持好奇心,不断挖掘自身潜力。

以下是加分项目:

- 能使用 Linux、MacOS 作为日常工作系统。
- 乐于分享,热衷于参与开源社区,曾参与过开源项目的开发。
- 良好的英文阅读能力,善于使用 Google、stackoverflow 等来解决技术问题

我们的联系方式


简历发送到电邮 : `[]rune{120, 122, 109}@hexiaoxiang.com`

我在这家公司的身份和角色


我是 Golang 团队的负责人,希望大家能成为战友。






[gev] 一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

Go开源项目惜朝 发表了文章 • 0 个评论 • 356 次浏览 • 2019-09-25 16:45 • 来自相关话题

`gev` 是一个 ...查看全部

`gev` 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。

➡️➡️ https://github.com/Allenxuxu/gev

特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持

网络模型

`gev` 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

性能测试

> 测试环境 Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

吞吐量测试

限制 GOMAXPROCS=1(单线程),1 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


其他测试

速度测试


和同类库的简单性能比较, 压测方式与 evio 项目相同。
- gnet
- eviop
- evio
- net (标准库)

限制 GOMAXPROCS=1,1 个 work 协程


限制 GOMAXPROCS=1,4 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


安装 gev


```bash
go get -u github.com/Allenxuxu/gev
```

快速入门


```go
package main

import (
"log"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
log.Println("OnMessage")
first, end := buffer.PeekAll()
out = first
if len(end) > 0 {
out = append(out, end...)
}
buffer.RetrieveAll()
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)

s, err := gev.NewServer(handler,
gev.Address(":1833"),
gev.NumLoops(2),
gev.ReusePort(true))
if err != nil {
panic(err)
}

s.Start()
}
```

Handler 是一个接口,我们的程序必须实现它。

```go
type Handler interface {
OnConnect(c *connection.Connection)
OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) []byte
OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error) {
```

在消息到来时,gev 会回调 OnMessage ,在这个函数中可以通过返回一个切片来发送数据给客户端。

```go
func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte)
```

Connection 还提供 Send 方法来发送数据。Send 并不会立刻发送数据,而是先添加到 event loop 的任务队列中,然后唤醒 event loop 去发送。

更详细的使用方式可以参考示例:[服务端定时推送]

```go
func (c *Connection) Send(buffer []byte) error
```

Connection ShutdownWrite 会关闭写端,从而断开连接。

更详细的使用方式可以参考示例:[限制最大连接数]

```go
func (c *Connection) ShutdownWrite() error
```


➡️➡️ https://github.com/Allenxuxu/gev



Go实现双向链表

文章分享link1st 发表了文章 • 1 个评论 • 758 次浏览 • 2019-09-20 09:30 • 来自相关话题

本文介绍什么是链表,常见的链表有哪些,然后介绍链表这种数据结构会在哪些地方可以用到,以及 Redis 队列是底层的实现,通过一个小实例来演示 Redis 队列有哪些功能,最后通过 Go 实现一个双向链表。 ...查看全部

本文介绍什么是链表,常见的链表有哪些,然后介绍链表这种数据结构会在哪些地方可以用到,以及 Redis 队列是底层的实现,通过一个小实例来演示 Redis 队列有哪些功能,最后通过 Go 实现一个双向链表。

链表

目录

1、链表

1.1 说明

链表

链表(Linked list)是一种常见的基础数据结构,是一种线性表,但是并不会按线性的顺序存储数据,而是在每一个节点里存到下一个节点的指针(Pointer)。由于不必须按顺序存储,链表在插入的时候可以达到O(1)的复杂度,比另一种线性表顺序表快得多,但是查找一个节点或者访问特定编号的节点则需要O(n)的时间,而顺序表相应的时间复杂度分别是O(logn)和O(1)。

链表有很多种不同的类型:单向链表,双向链表以及循环链表。

  • 优势:

可以克服数组链表需要预先知道数据大小的缺点,链表结构可以充分利用计算机内存空间,实现灵活的内存动态管理。链表允许插入和移除表上任意位置上的节点。

  • 劣势:

由于链表增加了节点指针,空间开销比较大。链表一般查找数据的时候需要从第一个节点开始每次访问下一个节点,直到访问到需要的位置,查找数据比较慢。

  • 用途:

常用于组织检索较少,而删除、添加、遍历较多的数据。

如:文件系统、LRU cache、Redis 列表、内存管理等。

1.2 单向链表

链表中最简单的一种是单向链表,

一个单向链表的节点被分成两个部分。它包含两个域,一个信息域和一个指针域。第一个部分保存或者显示关于节点的信息,第二个部分存储下一个节点的地址,而最后一个节点则指向一个空值。单向链表只可向一个方向遍历。

单链表有一个头节点head,指向链表在内存的首地址。链表中的每一个节点的数据类型为结构体类型,节点有两个成员:整型成员(实际需要保存的数据)和指向下一个结构体类型节点的指针即下一个节点的地址(事实上,此单链表是用于存放整型数据的动态数组)。链表按此结构对各节点的访问需从链表的头找起,后续节点的地址由当前节点给出。无论在表中访问哪个节点,都需要从链表的头开始,顺序向后查找。链表的尾节点由于无后续节点,其指针域为空,写作为NULL。

1.3 循环链表

循环链表是与单向链表一样,是一种链式的存储结构,所不同的是,循环链表的最后一个结点的指针是指向该循环链表的第一个结点或者表头结点,从而构成一个环形的链。

循环链表的运算与单链表的运算基本一致。所不同的有以下几点:

1、在建立一个循环链表时,必须使其最后一个结点的指针指向表头结点,而不是像单链表那样置为NULL。

2、在判断是否到表尾时,是判断该结点链域的值是否是表头结点,当链域的值等于表头指针时,说明已到表尾。而非象单链表那样判断链域的值是否为NULL。

1.4 双向链表

双向链表

双向链表其实是单链表的改进,当我们对单链表进行操作时,有时你要对某个结点的直接前驱进行操作时,又必须从表头开始查找。这是由单链表结点的结构所限制的。因为单链表每个结点只有一个存储直接后继结点地址的链域,那么能不能定义一个既有存储直接后继结点地址的链域,又有存储直接前驱结点地址的链域的这样一个双链域结点结构呢?这就是双向链表。

在双向链表中,结点除含有数据域外,还有两个链域,一个存储直接后继结点地址,一般称之为右链域(当此“连接”为最后一个“连接”时,指向空值或者空列表);一个存储直接前驱结点地址,一般称之为左链域(当此“连接”为第一个“连接”时,指向空值或者空列表)。

2、redis队列

2.1 说明

Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)

Redis 列表使用两种数据结构作为底层实现:双端列表(linkedlist)、压缩列表(ziplist)

通过配置文件中(list-max-ziplist-entries、list-max-ziplist-value)来选择是哪种实现方式

在数据量比较少的时候,使用双端链表和压缩列表性能差异不大,但是使用压缩列表更能节约内存空间

redis 链表的实现源码 redis src/adlist.h

2.2 应用场景

消息队列,秒杀项目

秒杀项目:

提前将需要的商品码信息存入 Redis 队列,在抢购的时候每个用户都从 Redis 队列中取商品码,由于 Redis 是单线程的,同时只能有一个商品码被取出,取到商品码的用户为购买成功,而且 Redis 性能比较高,能抗住较大的用户压力。

2.3 演示

如何通过 Redis 队列中防止并发情况下商品超卖的情况。

假设:

网站有三件商品需要卖,我们将数据存入 Redis 队列中

1、 将三个商品码(10001、10002、10003)存入 Redis 队列中

# 存入商品
RPUSH commodity:queue 10001 10002 10003

2、 存入以后,查询数据是否符合预期

# 查看全部元素
LRANGE commodity:queue 0 -1

# 查看队列的长度
LLEN commodity:queue

3、 抢购开始,获取商品码,抢到商品码的用户则可以购买(由于 Redis 是单线程的,同一个商品码只能被取一次 )

# 出队
LPOP commodity:queue

这里了解到 Redis 列表是怎么使用的,下面就用 Go 语言实现一个双向链表来实现这些功能。

3、Go双向链表

3.1 说明

这里只是用 Go 语言实现一个双向链表,实现:查询链表的长度、链表右端插入数据、左端取数据、取指定区间的节点等功能( 类似于 Redis 列表的中的 RPUSH、LRANGE、LPOP、LLEN功能 )。

3.2 实现

golang 双向链表

  • 节点定义

双向链表有两个指针,分别指向前一个节点和后一个节点

链表表头 prev 的指针为空,链表表尾 next 的指针为空

// 链表的一个节点
type ListNode struct {
prev *ListNode // 前一个节点
next *ListNode // 后一个节点
value string // 数据
}

// 创建一个节点
func NewListNode(value string) (listNode *ListNode) {
listNode = &ListNode{
value: value,
}

return
}

// 当前节点的前一个节点
func (n *ListNode) Prev() (prev *ListNode) {
prev = n.prev

return
}

// 当前节点的前一个节点
func (n *ListNode) Next() (next *ListNode) {
next = n.next

return
}

// 获取节点的值
func (n *ListNode) GetValue() (value string) {
if n == nil {

return
}
value = n.value

return
}
  • 定义一个链表

链表为了方便操作,定义一个结构体,可以直接从表头、表尾进行访问,定义了一个属性 len ,直接可以返回链表的长度,直接查询链表的长度就不用遍历时间复杂度从 O(n) 到 O(1)。

// 链表
type List struct {
head *ListNode // 表头节点
tail *ListNode // 表尾节点
len int // 链表的长度
}


// 创建一个空链表
func NewList() (list *List) {
list = &List{
}
return
}

// 返回链表头节点
func (l *List) Head() (head *ListNode) {
head = l.head

return
}

// 返回链表尾节点
func (l *List) Tail() (tail *ListNode) {
tail = l.tail

return
}

// 返回链表长度
func (l *List) Len() (len int) {
len = l.len

return
}
  • 在链表的右边插入一个元素
// 在链表的右边插入一个元素
func (l *List) RPush(value string) {

node := NewListNode(value)

// 链表未空的时候
if l.Len() == 0 {
l.head = node
l.tail = node
} else {
tail := l.tail
tail.next = node
node.prev = tail

l.tail = node
}

l.len = l.len + 1

return
}
  • 从链表左边取出一个节点
// 从链表左边取出一个节点
func (l *List) LPop() (node *ListNode) {

// 数据为空
if l.len == 0 {

return
}

node = l.head

if node.next == nil {
// 链表未空
l.head = nil
l.tail = nil
} else {
l.head = node.next
}
l.len = l.len - 1

return
}
  • 通过索引查找节点

通过索引查找节点,如果索引是负数则从表尾开始查找。

自然数和负数索引分别通过两种方式查找节点,找到指定索引或者是链表全部查找完则查找完成。

// 通过索引查找节点
// 查不到节点则返回空
func (l *List) Index(index int) (node *ListNode) {

// 索引为负数则表尾开始查找
if index < 0 {
index = (-index) - 1
node = l.tail
for true {
// 未找到
if node == nil {

return
}

// 查到数据
if index == 0 {

return
}

node = node.prev
index--
}
} else {
node = l.head
for ; index > 0 && node != nil; index-- {
node = node.next
}
}

return
}
  • 返回指定区间的元素
// 返回指定区间的元素
func (l *List) Range(start, stop int) (nodes []*ListNode) {
nodes = make([]*ListNode, 0)

// 转为自然数
if start < 0 {
start = l.len + start
if start < 0 {
start = 0
}
}

if stop < 0 {
stop = l.len + stop
if stop < 0 {
stop = 0
}
}

// 区间个数
rangeLen := stop - start + 1
if rangeLen < 0 {

return
}

startNode := l.Index(start)
for i := 0; i < rangeLen; i++ {
if startNode == nil {
break
}

nodes = append(nodes, startNode)
startNode = startNode.next
}

return
}

4、总结

  • 到这里关于链表的使用已经结束,介绍链表是有哪些(单向链表,双向链表以及循环链表),也介绍了链表的应用场景(Redis 列表使用的是链表作为底层实现),最后用 Go 实现了双向链表,演示了链表在 Go 语言中是怎么使用的,大家可以在项目中更具实际的情况去使用。

5、参考文献

维基百科 链表

github redis

项目地址:go 实现队列

https://github.com/link1st/link1st/tree/master/linked

gnet: 一个轻量级且高性能的 Go 网络库

开源程序panjf2000 发表了文章 • 1 个评论 • 577 次浏览 • 2019-09-18 16:08 • 来自相关话题


gnet












# 博客原文
https://taohuawu.club/go-event-loop-networking-library-gnet

# Github 主页
https://github.com/panjf2000/gnet

欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦。

# 简介

`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll) 和 [kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[libuv](https://github.com/libuv/libuv) 和 [libevent](https://github.com/libevent/libevent)。

这个项目存在的价值是提供一个在网络包处理方面能和 [Redis](http://redis.io)、[Haproxy](http://www.haproxy.org) 这两个项目具有相近性能的Go 语言网络服务器框架。

`gnet` 的亮点在于它是一个高性能、轻量级、非阻塞的纯 Go 实现的传输层(TCP/UDP/Unix-Socket)网络库,开发者可以使用 `gnet` 来实现自己的应用层网络协议,从而构建出自己的应用层网络应用:比如在 `gnet` 上实现 HTTP 协议就可以创建出一个 HTTP 服务器 或者 Web 开发框架,实现 Redis 协议就可以创建出自己的 Redis 服务器等等。

**`gnet` 衍生自另一个项目:`evio`,但是性能更好。**

# 功能

- [高性能](#性能测试) 的基于多线程模型的 Event-Loop 事件驱动
- 内置 Round-Robin 轮询负载均衡算法
- 简洁的 APIs
- 基于 Ring-Buffer 的高效内存利用
- 支持多种网络协议:TCP、UDP、Unix Sockets
- 支持两种事件驱动机制:Linux 里的 epoll 以及 FreeBSD 里的 kqueue
- 支持异步写操作
- 允许多个网络监听地址绑定在一个 Event-Loop 上
- 灵活的事件定时器
- SO_REUSEPORT 端口重用

# 核心设计

## 多线程模型

`gnet` 重新设计开发了一个新内置的多线程模型:『主从 Reactor 多线程』,这也是 `netty` 默认的线程模型,下面是这个模型的原理图:


multi_reactor



它的运行流程如下面的时序图:


reactor



现在我正在 `gnet` 里开发一个新的多线程模型:『带线程/go程池的主从 Reactors 多线程』,并且很快就能完成,这个模型的架构图如下所示:


multi_reactor_thread_pool



它的运行流程如下面的时序图:


multi-reactors



## 通信机制

`gnet` 的『主从 Reactors 多线程』模型是基于 Golang 里的 Goroutines的,一个 Reactor 挂载在一个 Goroutine 上,所以在 `gnet` 的这个网络模型里主 Reactor/Goroutine 与从 Reactors/Goroutines 有海量通信的需求,因此 `gnet` 里必须要有一个能在 Goroutines 之间进行高效率的通信的机制,我没有选择 Golang 里的主流方案:基于 Channel 的 CSP 模型,而是选择了性能更好、基于 Ring-Buffer 的 Disruptor 方案。

所以我最终选择了 [go-disruptor](https://github.com/smartystreets-prototypes/go-disruptor):高性能消息分发队列 LMAX Disruptor 的 Golang 实现。

## 自动扩容的 Ring-Buffer

`gnet` 利用 Ring-Buffer 来缓存 TCP 流数据以及管理内存使用。







# 开始使用

## 安装

```sh
$ go get -u github.com/panjf2000/gnet
```

## 使用示例

```go
// ======================== Echo Server implemented with gnet ===========================

package main

import (
"flag"
"fmt"
"log"
"strings"

"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/ringbuffer"
)

func main() {
var port int
var loops int
var udp bool
var trace bool
var reuseport bool

flag.IntVar(&port, "port", 5000, "server port")
flag.BoolVar(&udp, "udp", false, "listen on udp")
flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)")
flag.BoolVar(&trace, "trace", false, "print packets to console")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()

var events gnet.Events
events.NumLoops = loops
events.OnInitComplete = func(srv gnet.Server) (action gnet.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
return
}
events.React = func(c gnet.Conn, inBuf *ringbuffer.RingBuffer) (out []byte, action gnet.Action) {
top, tail := inBuf.PreReadAll()
out = append(top, tail...)
inBuf.Reset()

if trace {
log.Printf("%s", strings.TrimSpace(string(top)+string(tail)))
}
return
}
scheme := "tcp"
if udp {
scheme = "udp"
}
log.Fatal(gnet.Serve(events, fmt.Sprintf("%s://:%d", scheme, port)))
}

```

## I/O 事件

`gnet` 目前支持的 I/O 事件如下:

- `OnInitComplete` 当 server 初始化完成之后调用。
- `OnOpened` 当连接被打开的时候调用。
- `OnClosed` 当连接被关闭的时候调用。
- `OnDetached` 当主动摘除连接的时候的调用。
- `React` 当 server 端接收到从 client 端发送来的数据的时候调用。(你的核心业务代码一般是写在这个方法里)
- `Tick` 服务器启动的时候会调用一次,之后就以给定的时间间隔定时调用一次,是一个定时器方法。
- `PreWrite` 预先写数据方法,在 server 端写数据回 client 端之前调用。

# 性能测试

## Linux (epoll)

### 系统参数

```powershell
Go Version: go1.12.9 linux/amd64
OS: Ubuntu 18.04
CPU: 8 Virtual CPUs
Memory: 16.0 GiB
```

### Echo Server

![echolinux.png](https://img.hacpai.com/file/2019/09/echolinux-fca6e6e5.png)


### HTTP Server

![httplinux.png](https://img.hacpai.com/file/2019/09/httplinux-663a0318.png)


## FreeBSD (kqueue)

### 系统参数

```powershell
Go Version: go version go1.12.9 darwin/amd64
OS: macOS Mojave 10.14.6
CPU: 4 CPUs
Memory: 8.0 GiB
```

### Echo Server

![echomac.png](https://img.hacpai.com/file/2019/09/echomac-7a29e0d1.png)


### HTTP Server

![httpmac.png](https://img.hacpai.com/file/2019/09/httpmac-cb6d26ea.png)


# 证书

`gnet` 的源码允许用户在遵循 MIT [开源证书](https://github.com/panjf2000/gnet/blob/master/LICENSE) 规则的前提下使用。

# 待做事项

> gnet 还在持续开发的过程中,所以这个仓库的代码和文档会一直持续更新,如果你对 gnet 感兴趣的话,欢迎给这个开源库贡献你的代码~~

beego 开发的博客 go-blog

开源程序猴子 发表了文章 • 3 个评论 • 317 次浏览 • 2019-09-16 12:00 • 来自相关话题

个人博客 项目地址: https://github.com/1920853199/go-blog demo 地址: http://leechan.online ...查看全部
个人博客 
项目地址: https://github.com/1920853199/go-blog
demo 
地址: http://leechan.online