每日新闻

每日新闻

GoCN每日新闻资讯
有问必答

有问必答

Go相关的问题,技术相关的问题
文章分享

文章分享

技术文章分享,让知识传播给更多的人
招聘应聘

招聘应聘

为Gopher服务的招聘应聘平台

COSCon'19 | 如何设计新一代的图数据库 Nebula

NebulaGraph 发表了文章 • 0 个评论 • 154 次浏览 • 3 天前 • 来自相关话题

11 月 2 号 - 11 月 3 号,以“大爱无疆,开源无界”为主题的 2019 中国开源年会(COSCon'19)正式启动,大会以开源治理、国际接轨、社区发展和开源项目为切入点同全球开源爱好者们共同交流开源。作为图数据库技术的代表,Ne ...查看全部

11 月 2 号 - 11 月 3 号,以“大爱无疆,开源无界”为主题的 2019 中国开源年会(COSCon'19)正式启动,大会以开源治理、国际接轨、社区发展和开源项目为切入点同全球开源爱好者们共同交流开源。

作为图数据库技术的代表,Nebula Graph 总监——吴敏在本次大会上将会讲述了大规模分布式图数据库设计思考和实践。在信息爆发式增长和内容平台遍地开花的信息时代,图数据库在当中扮演了什么样的角色?同传统数据库相比,图数据库又有什么优势?图数据库开发需要哪些新技术?就此,开源社特访吴敏来分享下图数据库主题内容,从图数据 Nebula 的研发开始,就传统数据库面临的挑战,开源模式的优势,Nebula 的社区开展和产品规划等问题进行深入解析。

About Nebula 总监--吴敏

开源社:Hi,吴敏,先和大家介绍下自己。

大家好,我是吴敏,VEsoft 总监,博士毕业于浙江大学。 曾就职于阿里云、蚂蚁金服,从事分布式图数据库以及云存储相关工作。

开源社:谈谈您在 COSCon'19 上的分享话题。

随着抖音、小红书等社交内容平台的爆红诞生了一种基于社交关系网路的推荐需求,而以垂直领域作为切入点的知识图谱过去两年的“爆火”,传统数据库在处理社交推荐、风控、知识图谱方面的性能缺陷,图数据库的研发应运而生。

本演讲开篇将陈述图数据库行业现状,让你对图数据库存储的数据及对场景有所了解,再从开源的分布式图数据库 Nebula Graph 切入深度讲解大规模分布式图数据库应该如何设计存储、计算及架构,最后讲述开源对图数据库开发的影响。

内容大纲

  1. 图数据库概述及应用
  2. Nebula Graph 设计介绍
  3. 技术细节
  4. 开源社区及服务

开源社:哪些人可以应该了解这个内容?

对图数据库有兴趣,或是有推荐、风控、知识图谱等业务场景需求的人。

Nebula 研发之旅

开源社:为什么给图数据库取名 Nebula ?

Nebula 是星云的意思,很大嘛,也是漫威宇宙里面漂亮的星云小姐姐。对了,Nebula的发音是:[ˈnɛbjələ]

开源社:现在数据库领域百花齐放,国产的 OceanBase 和 TiDB 都发展得不错,为什么还要研发 Nebula 这样的图数据库?

OceanBase、TiDB 这类 NewSQL 最近发展势头很强劲,他们的出现更多的是对传统单机的关系型数据库在可用性的补充。
Nebula 聚焦在图数据库这一领域,也是近年来在数据库各分支中增长最为快速的领域。图数据库使用图(或者网)的方式很直接、自然的表达现实世界的关系: 用节点来表示实体,边来表示关联关系,everything is connected。能高效的提供图检索,提供专业的分析算法、工具,比如 ShortestPath、PageRank、标签传播等等。

开源社:图数据库应用场景有哪些?

典型的应用场景有社交网络,金融风控,推荐引擎,知识图谱等。

社交网络,比如,推荐一条最短路径让我结识迪纳热巴,还可以加上筛选条件,路径中的每个人都是单身女性。

金融风控场景,比如,去查一个信用卡反套现的网络。很典型的一个场景,A 转账到 B,B 转账到 C,C 又转回给 A 即是一个典型的闭环。对于这样的闭环,这类查询在图数据库大规模应用之前,大部分都是采用离线计算的方式去查找,但是离线场景很难去控制当前发生的这笔交易。一个信用卡交易或者在线贷款,整个作业流程很长,而在反套现这块的审核时间又限制在毫秒级,这就是图数据库非常大的一个应用场景。

在推荐算法中,为某个人推荐他的好友。现在的方案是去找好友的好友,判断好友的好友有没有可能成为某人的新好友,这当中涉及好友关系的亲密度,抵达好友的好友的最短路径等。业务方可能会用 MySQL 等传统数据库或是 HBase 来存各类好友关系,然后通过多个串行的 Key-Value 来做查询,但这在线上场景是很难满足性能要求的。

知识图谱这些年非常火,知识图谱结合自然语言的形式在金融,医疗,互联网等众多领域被广泛使用,常见的有语音助手、聊天机器人、智能问答等应用场景。而图数据库存储的数据结构完全适配知识图谱数据,图谱中的实体对应图数据库的点,实体与实体的关系对应图数据库的边,拿 Nebula 为例,Nebula Graph Schema 采用属性图,点边上的属性对应图谱实体和关系中的属性,边的方向表示了关系的方向,边上的标记表示了关系的类型。

再说到最近国内非常火的区块链场景,由于区块链上的所有行为都是公开被记录的又是不可篡改的,因此所有的交易行为,不管是历史数据,还是大概每几分钟产生的新 block,都可以对 DAT 文件解析后导入到图数据库和 GNN 中做分析。例如我们都听说在一些数字货币场景下,洗钱、盗窃、团伙、操纵市场的各类事情很多,通过图的手段包括可以帮助我们挖掘里面的非法行为。

开源社:作为图数据库,有参考借鉴了哪些数据库吗?哪些方面是 Nebula 有特点的设计?

Nebula 是完全自主研发的数据库,它主要有以下的技术特点

存储计算分离

对于 Nebula Graph 来讲,有这么几个技术特点:第一个就是采用了存储计算分离的架构,主要好处就是为了上云或者说弹性,方便单独扩容。业务水位总是很难预测的,一段时间存储不够了,有些时候计算不够了。在云上或者使用容器技术,计算存储分离的架构运维起来会比较方便,成本也更好控制。大家使用 HBase 那么久,这方面的感触肯定很多。

查询语言 nGQL

Nebula Graph 的第二个技术特点是它的查询语言,我们称为 nGQL,比较接近 SQL。唯一大一点的语法差异就是 不用嵌套 (embedding)。大家都知道嵌套的 SQL,读起来是非常痛苦的,要从里向外读。另外,由于图这块目前并没有统一的国际标准,这对整个行业的发展并不是好事,用户的学习成本很高。目前有个 ISO / IEC 组织在准备图语言的国际标准,我们也在积极兼容标准。

支持多种后端存储

第三个特点就是 Nebula Graph 支持多种后端存储,除了原生的引擎外,也支持 HBase。 因为很多用户,对 HBase 已经相当熟悉了,并不希望多一套存储架构。从架构上来说,Nebula Graph 是完全对等的分布式系统。

计算下推

和 HBase 的 CoProcessor 一样,Nebula Graph 支持数据计算下推。数据过滤,包括一些简单的聚合运算,能够在存储层就做掉,这样对于性能来讲能提升会非常大。

多租户

多租户,Nebula Graph是通过多 Space 来实现的。Space 是物理隔离。

索引

除了图查询外,还有很常见的一种场景是全局的属性查询。这个和 MySQL 一样,要提升性能的主要办法是为属性建立索引 ,这个也是 Nebula Graph 原生支持的功能。

图算法

最后的技术特点就是关于图算法方面。
这里的算法和全图计算不太一样,更多是一个子图的计算,比如最短路径。大家知道数据库通常有 OLTP 和 OLAP 两种差异很大的场景,当然现在有很多 HTAP 方面的努力。那对于图数据库来说也是类似,我们在设计 Nebula Graph 的时候,做了一些权衡。我们认为全图的计算,比如 Page Rank,LPA,它的技术挑战和 OLTP 的挑战和对应的设计相差很大。所以 Nebula 的查询引擎主要针对 OLTP 类的场景。
那么,对于 OLAP 类的计算需求,我们的考虑是通过支持和 Spark 的相互访问,来支持 Spark 上图计算,比如 graphX。这块工作正在开发中,应该在最近一两个月会发布。

开源社:为什么会考虑存储计算分离的架构呢?

存储计算分离是个很热的话题。我们将存储模块和 Query Engine 层分开主要有以下考虑。

  1. 成本的原因。存储和计算对计算机资源要求不一样,存储依赖 I/O,计算对 CPU 和内存的要求更高,业务在不同的应用或者发展时期,需要不同的存储空间和计算能力配比,存储和计算的耦合会使得机器的选型会比较复杂,存储计算分离的架构,使得 storage 的 scale out/in 更容易。
  2. 存储层抽象出来可以给计算带来新的选择,比如对接 Pregel, Spark GraphX 这些计算引擎。通常来说,图计算对于存储的要求是吞吐量优先的,而在线查询是时延优先的。通过把存储层分离出来,不管是开发的时候(做 QoS )还是运维的时候(单独集群部署),都会更容易一些。
  3. 在云计算场景下,能实现真正的弹性计算。

开源社:作为一个分布式数据库,是如何保障数据一致性的?

我们使用 Raft 协议,Raft 一致性协议使得 shared-nothing 的 kv 有一致性保障。为什么选择 Raft?相对于 Paxos,Raft 更加有利于工程化实现。Nebula 存储层 Raft 使用 Multi-Raft 的模型,多个 replica 上的同一个 partition 组成一个 Raft 组,同一个集群内存在互相独立 Raft 组,在一致性保障的同时,提高了系统的并发能力。

开源社:在数据库的优化方面,Nebula 做了哪些?

Nebula 在数据优化方面主要做了以下工作:

  1. 异步和并发执行:由于 IO 和网络均为长时延操作,Nebula Graph 采用异步及并发操作。此外,为避免一些大query 的长尾影响,为每个 query 设置单独的资源池以保证服务质量 QoS。
  2. 计算下沉:为避免存储层将过多数据回传到计算层,占用宝贵带宽,条件过滤等算子会随查询条件一同下发到存储层节点。
  3. 数据库系统的优化与数据的物理存储方式以及数据的分布息息相关。而且随着业务的发展,数据分布是会发生变化的,一开始设计的索引和数据存储或者分区会慢慢变得不是最优的,这就需要系统能够做一些动态的调整。我们 storage 支持 scale out/in, load balance。系统的调整会带来 overhead,这是需要权衡考虑的问题。

开源社:现在市面上已有一些图数据库,Nebula 考虑兼容部分数据库让已有的用户无缝切到 Nebula 吗?

Nebula 有 CSV、HDFS 批量 数据导入工具。用户可以将数仓的数据导入到 Nebula。也提供 C++,Java,Golang,Python 的客户端。另外对于市面上已有的一些产品,现在也正在开发将它的数据格式直接解析为 Nebula 的数据格式,这样就可以非常方便的迁移,包括查询语言层面的兼容。

开源社:水平伸缩能够支持多大的规模?

存储层 shared-nothing 的架构,理论上支持无限加机器。

开源社:Nebula 最新的版本 RC1 支持最短路径和全路径算法,可以具体讲下这块的实现,及以后的研发规划吗?

目前实现较为简单,基于双向搜索,返回点边组合的路径。未来规划是计划在执行计划与优化器都完成后,完善对路径的支持,包括实现 match,支持双向 bfs、双向 dijkstra、allpair(全路径),kshortest 等。当然我们欢迎社区的同学们都参与完善 Nebula 的路径算法。

开源社:使用 Nebula 之前,用户应该做哪些准备工作?

对于刚开始使用图数据库的用户,我们提供了详细的文档;

对于已经在使用其他图数据库,想要试试 Nebula 的用户,我们提供了数据导入等工具,有疑问或者任何问题,欢迎在 GitHub 上给我们提 issue,我们的工程师会在第一时间为您解答。

Nebula 和开源

开源社:作为一个企业级产品,为什么 Nebula 一开始就选择了走开源路线?

如果没 Linux,现在互联网的格局也不会是今天这样。我们想要建立图数据库的社区,做出更好的图数据库产品,也希望更多对 Nebula,对图数据库感兴趣的同学成为社区的贡献者,一起努力,共同建立一个互助互利的社区。

开源社:在开源的过程中,有遇到什么困难吗?

很多人都想为开源做一份力,但会被开源项目的门槛“劝退”,尤其是 Nebula 是一个即使耕耘在数据库领域多年的数据库专家,如果对图数据库的不够了解的话,都会感叹“高大上”的一个项目。但技术是为业务服务的,所以 Nebula 力求自己的文档让你即使你对图数据库一无所知,通过 Nebula 的文档也能够了解到图数据库及其应用场景。

开源社:在开源社区搭建这块,有什么可以和开源社小伙伴们分享的吗?

开源项目最重要的是生态的搭建,Nebula Graph 刚开源半年在社区搭建这块只能说略有心得,仅供大家参考 :) 开源社区运营主要从下面几个方面展开

  1. 简洁明了的文档:一个好的文档能让使用者快速同产品拉近距离,Nebula 的文档从“让非技术人做技术事”的出发,力求即使你是一个不懂技术的人也可以按照文档部署 Nebula,玩起来——用 Nebula 完成简单的 CRUD,如果开源社的小伙伴阅读过 Nebula 文档觉得哪里有更改意见,欢迎联系我们;
  2. 实时的反馈回复:用户的反馈,我们会第一时间进行回复,在 GitHub 的 issue 及用户交流群里进行回复;
  3. 同用户直接对话:在线上,Nebula 在各大技术平台同图数据库和 Nebula 爱好者们进行交流,包括 Nebula 架构设计、用户使用实操等系列文章;在线下,我们也开展了主题 Meetup 同各地爱好者交流图数据库技术及 Nebula 的开发心得;
  4. 社区用户体系:在 Nebula 的 GitHub 上,现阶段你可以看到 3 种用户,User、Contributor、Committer,User 通过向 Nebula 提 issue / pr 或者投稿等方式成为 Contributor,Contributor 再进阶成为 Committer。配合 Nebula 开展的各类社区活动,eg:捉虫活动,帮助社区用户完成角色“升级”;

最后,打个小广告:欢迎大家来参与到 Nebula 的建设中,为开源贡献一份力 :)

程序员寄语

开源社: 作为资深数据库从业人员,怎样让自己的眼界更加开阔,怎么获取这个领域的最前沿信息?


多看看论文,看看开源分布式系统的设计以及源代码;多关注数据库的的会议,比如,SIGMOD, VLDB,关注学术界的最新成果;多关注业界相关公司的发展和动态,比如 OsceanBase,TiDB。

Nebula 有话说

以上为开源社对图数据库 Nebula 总监——吴敏的采访,欢迎你关注 Nebula GitHub:github.com/vesoft-inc/nebula 了解 Nebula 最新动态或添加 Nebula 小助手为好友进图数据库技术交流群交流,小助手微信号:NebulaGraphbot

推荐阅读

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

panjf2000 发表了文章 • 0 个评论 • 311 次浏览 • 4 天前 • 来自相关话题

原文Go netpoll I/O 多路复用构建原生网络模型之源码深度解析 ...查看全部

原文

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

导言

Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于 Go 的I/O 多路复用 netpoll),提供了 goroutine-per-connection 这样简单的网络编程模式。在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。

然而,在工程性上能做到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于非常复杂的封装,做了很多取舍,也有可能放弃了一些『极致』的设计和理念。事实上netpoll底层就是基于 epoll/kqueue/iocp 这些系统调用来做封装的,最终暴露出 goroutine-per-connection 这样的极简的开发模式给使用者。

Go netpoll 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

本文将基于 linux 平台来解析 Go netpoll 之 I/O 多路复用的底层是如何基于 epoll 封装实现的,从源码层层推进,全面而深度地解析 Go netpoll 的设计理念和实现原理,以及 Go 是如何利用netpoll来构建它的原生网络模型的。主要涉及到的一些概念:I/O 模式、用户/内核空间、epoll、linux 源码、goroutine scheduler 等等,我会尽量简单地讲解,如果有对相关概念不熟悉的同学,还是希望能提前熟悉一下。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。

I/O 多路复用

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路复用 (I/O multiplexing)
  • 信号驱动 I/O (Signal driven I/O)
  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select & poll

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select 是 epoll 之前 linux 使用的 I/O 事件驱动技术。

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第5位置为1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

epoll 是 linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的等待消息准备好I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 预估错误了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的:返回的活跃连接 == select(全部待监控的连接)

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被高频二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

#include <sys/epoll.h>  
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是不太频繁被调用的,而 epoll_wait 则是非常频繁被调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 都会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为:ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdlist 双向链表中是否有就绪的 fd,当 rdlist 为空(无就绪fd)时挂起当前进程,直到 rdlist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

延伸

另外,我看到有些文章说 epoll 之所以性能高是因为利用了 linux 的 mmap 内存映射让内核和用户进程共享了一片物理内存,用来存放就绪 fd 列表和它们的数据 buffer,所以用户进程在 epoll_wait返回之后用户进程就可以直接从共享内存那里读取/写入数据了,这让我很疑惑,因为首先看epoll_wait的函数声明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二个参数:就绪事件列表,是需要在用户空间分配内存然后再传给epoll_wait的,如果内核会用 mmap 设置共享内存,直接传递一个指针进去就行了,根本不需要在用户态分配内存,多此一举。其次,内核和用户进程通过 mmap 共享内存是一件极度危险的事情,内核无法确定这块共享内存什么时候会被回收,而且这样也会赋予用户进程直接操作内核数据的权限和入口,非常容易出现大的系统漏洞,因此一般极少会这么做。所以我很怀疑 epoll 是不是真的在 linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源码:

/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
// ...

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
// ...

send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/


// 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
// 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
// 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
// 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

// ...
}

// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;

esed.maxevents = maxevents;
esed.events = events;
// 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
// 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}

// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
// ...

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/

revents = ep_item_poll(epi, &pt, 1);
// 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
if (!revents)
continue;
// 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
// 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}

// ...
}

do_epoll_wait开始层层跳转,我们可以很清楚地看到最后内核是通过__put_user函数把就绪 fd 列表和事件返回到用户空间,而__put_user正是内核用来拷贝数据到用户空间的标准函数。此外,我并没有在 linux kernel 的源码中和 epoll 相关的代码里找到 mmap 系统调用做内存映射的逻辑,所以基本可以得出结论:epoll 在 linux kernel 里并没有使用 mmap 来做用户空间和内核空间的内存共享,所以那些说 epoll 使用了 mmap 的文章都是误解。

Non-blocking I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK标志位,从而把 socket 设置成 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

Go netpoll

一个典型的 Go TCP server:

package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}

// start a new goroutine to handle the new connection
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// 如果没有可读数据,也就是读 buffer 为空,则阻塞
_, _ = conn.Read(packet)
// 同理,不可写则阻塞
_, _ = conn.Write(packet)
}
}

上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!

Go netpoll 核心

Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。

上面的示例代码中相关的在源码里的几个数据结构和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

// Conn
type conn struct {
fd *netFD
}

type conn struct {
fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

netFD

net.Listen("tcp", ":8888") 方法返回了一个 TCPListener,它是一个实现了 net.Listener 接口的 struct,而通过 listen.Accept() 接收的新连接 TCPConn 则是一个实现了 net.Conn 接口的 struct,它内嵌了 net.conn struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD 的数据结构的操作,netFD 是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。

netFDpoll.FD的源码:

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

pollDesc

前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:

type pollDesc struct {
runtimeCtx uintptr
}

这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在runtime.pollDesc这里:

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock

// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}

runtime.pollDesc包含自身类型的一个指针,用来保存下一个runtime.pollDesc的地址,以此来实现链表,可以减少数据结构的大小,所有的runtime.pollDesc保存在runtime.pollCache结构中,定义如下:

type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}

net.Listen

调用 net.Listen之后,底层会通过 Linux 的系统调用socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD,接着调用 netFD 的listenStream方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:

// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc func(int, int, int) (int, error) = syscall.Socket

// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...

// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}

// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}

// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once

// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)

// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}

// netpollopen 会被 runtime_pollOpen,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:

#include <sys/epoll.h>  
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服务端的 netFD 在listen时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列
  2. netFD 在accept时将返回的 connFD 也加入 epoll 的事件队列
  3. netFD 在读写时出现syscall.EAGAIN错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead中返回

Listener.Accept()接收来自客户端的新连接,具体还是调用netFD.accept方法来完成这个功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

netFD.accept方法里再调用poll.FD.Accept,最后会使用 linux 的系统调用accept来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。

Conn.Read/Conn.Write

我们先来看看Conn.Read方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链还是首先调用 conn 的netFD.Read,然后内部再调用 poll.FD.Read,最后使用 linux 的系统调用 read: syscall.Read完成数据读取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}

conn.Writeconn.Read的原理是一致的,它也是通过类似 pollDesc.waitReadpollDesc.waitWrite来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite的内部实现原理和pollDesc.waitRead是一样的,都是基于runtime_pollWait,这里就不再赘述。

pollDesc.waitRead

pollDesc.waitRead内部调用了 runtime_pollWait来达成无 I/O 事件时 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
// 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}

// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg

// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,然后重新调度运行该 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}

netpoll

前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由_Grunning_Gwaitting),因此G必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。

所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait,在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(block bool) gList 方法,它会内部调用epoll_wait获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
// 是否以阻塞模式调用 epoll_wait
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
// 判断发生的事件类型,读类型或者写类型
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}

// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}

// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintpt

[ gev ] Go 语言优雅处理 TCP “粘包”

惜朝 发表了文章 • 0 个评论 • 620 次浏览 • 2019-11-01 10:48 • 来自相关话题

https://github.com/Allenxuxu/gev ...查看全部

https://github.com/Allenxuxu/gev

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

TCP 为什么会粘包


TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送(Nagle算法)。

gev 如何优雅处理


gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区(ringbuffer)通过参数传递过来。

用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。



做法如下,定义一个接口 Protocol

```go
// Protocol 自定义协议编解码接口
type Protocol interface {
UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
Packet(c *Connection, data []byte) []byte
}
```


用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

下面,我们实现一个简单的自定义协议插件,来启动一个 Server :

```text
| 数据长度 n | payload |
| 4字节 | n 字节 |
```

```go
// protocol.go
package main

import (
"encoding/binary"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
if buffer.VirtualLength() > exampleHeaderLen {
buf := pbytes.GetLen(exampleHeaderLen)
defer pbytes.Put(buf)
_, _ = buffer.VirtualRead(buf)
dataLen := binary.BigEndian.Uint32(buf)

if buffer.VirtualLength() >= int(dataLen) {
ret := make([]byte, dataLen)
_, _ = buffer.VirtualRead(ret)

buffer.VirtualFlush()
return nil, ret
} else {
buffer.VirtualRevert()
}
}
return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
dataLen := len(data)
ret := make([]byte, exampleHeaderLen+dataLen)
binary.BigEndian.PutUint32(ret, uint32(dataLen))
copy(ret[4:], data)
return ret
}
```

```go
// server.go
package main

import (
"flag"
"log"
"strconv"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Println("OnMessage:", data)
out = data
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)
var port int
var loops int

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

s, err := gev.NewServer(handler,
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.Protocol(&ExampleProtocol{}))
if err != nil {
panic(err)
}

log.Println("server start")
s.Start()
}
```
完整代码地址

当回调 `OnMessage` 函数的时候,会通过参数传递已经拆好包的用户数据。

当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 `gev.NewServer` 时指定即可:

```go
gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
```

## 基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件(WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

```go
package websocket

import (
"log"

"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
upgrade *ws.Upgrader
}

// New 创建 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 协议,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
upgraded := c.Context()
if upgraded == nil {
var err error
out, _, err = p.upgrade.Upgrade(buffer)
if err != nil {
log.Println("Websocket Upgrade :", err)
return
}
c.SetContext(true)
} else {
header, err := ws.VirtualReadHeader(buffer)
if err != nil {
log.Println(err)
return
}
if buffer.VirtualLength() >= int(header.Length) {
buffer.VirtualFlush()

payload := make([]byte, int(header.Length))
_, _ = buffer.Read(payload)

if header.Masked {
ws.Cipher(payload, header.Mask, 0)
}

ctx = &header
out = payload
} else {
buffer.VirtualRevert()
}
}
return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
return data
}
```

具体的实现,可以到仓库的 [plugins/websocket](https://github.com/Allenxuxu/gev/tree/master/plugins/websocket) 查看。

## 相关文章

- [开源 gev: Go 实现基于 Reactor 模式的非阻塞 TCP 网络库](https://note.mogutou.xyz/articles/2019/09/19/1568896693634.html)
- [Go 网络库并发吞吐量测试](https://note.mogutou.xyz/articles/2019/09/22/1569146969662.html)

## 项目地址

https://github.com/Allenxuxu/gev

【go 源码】sync.Once 详解

xmgee 发表了文章 • 0 个评论 • 513 次浏览 • 2019-10-31 22:50 • 来自相关话题

# sync.Once 源码阅读## 1.Demo```package mainimport ( "fmt" "sync" "time")func m ...查看全部

# sync.Once 源码阅读

## 1.Demo

```
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var once sync.Once

for i:=0;i<=10;i++{
go once.Do(func() {
fmt.Println("hello world")
})
}

time.Sleep(time.Second * 2)
}
```

## 2.介绍

sync.Once是sync包中的一个对象,它只有一个方法Do,这个方法很特殊,在程序运行过程中,无论被多少次调用,只会执行一次,就与结构体的名称一样,once(一次)。那它是如何做的呢?

## 3.使用场景

当程序运行过程中,在会被多次调用的地方却只想执行一次某代码块。就可以全局声明一个once,然后用once.Do()来之行此代码块。

## 4.源码

```
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

import (
"sync/atomic"
)

// Once is an object that will perform exactly one action.
type Once struct {
m Mutex
done uint32
}

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

```

## 5.源码解析

可以看到once结构体中,有两个字段,m是了保证并发安全性的,done是标志是否已经执行过此方法,如果done是1则表示执行过,0表示未执行。

Do方法中,首先通过atomic.LoadUint32(&o.done),来取得done的值,看是否为1,如果为1就表示已经执行过了,直接返回,未执行则继续执行。

代码很简单,就不啰嗦了,值得注意的是 `defer atomic.StoreUint32(&o.done, 1)`很精髓,为了防止f()方法中panic,无法为done赋值,作者特地使用defer。值得学习。

----

项目地址:github.com/xmge,更多go源码阅读文章将在公众号发布:

![img](https://gosc.oss-cn-beijing.aliyuncs.com/gosc.jpg)

【Go】高效截取字符串的一些思考

qiyin 发表了文章 • 0 个评论 • 556 次浏览 • 2019-10-31 08:33 • 来自相关话题

原文链接: ...查看全部

原文链接:https://blog.thinkeridea.com/201910/go/efficient_string_truncation.html

最近我在 Go Forum 中发现了 String size of 20 character 的问题,“hollowaykeanho” 给出了相关的答案,而我从中发现了截取字符串的方案并非最理想的方法,因此做了一系列实验并获得高效截取字符串的方法,这篇文章将逐步讲解我实践的过程。

字节切片截取

这正是 “hollowaykeanho” 给出的第一个方案,我想也是很多人想到的第一个方案,利用 go 的内置切片语法截取字符串:

s := "abcdef"
fmt.Println(s[1:4])

我们很快就了解到这是按字节截取,在处理 ASCII 单字节字符串截取,没有什么比这更完美的方案了,中文往往占多个字节,在 utf8 编码中是3个字节,如下程序我们将获得乱码数据:

s := "Go 语言"
fmt.Println(s[1:4])

杀手锏 - 类型转换 []rune

hollowaykeanho” 给出的第二个方案就是将字符串转换为 []rune,然后按切片语法截取,再把结果转成字符串。

s := "Go 语言"
rs := []rune(s)
fmt.Println(strings(rs[1:4]))

首先我们得到了正确的结果,这是最大的进步。不过我对类型转换一直比较谨慎,我担心它的性能问题,因此我尝试在搜索引擎和各大论坛查找答案,但是我得到最多的还是这个方案,似乎这已经是唯一的解。

我尝试写个性能测试评测它的性能:

package benchmark

import (
"testing"
)

var benchmarkSubString = "Go语言是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。为了方便搜索和识别,有时会将其称为Golang。"
var benchmarkSubStringLength = 20

func SubStrRunes(s string, length int) string {
if utf8.RuneCountInString(s) > length {
rs := []rune(s)
return string(rs[:length])
}

return s
}

func BenchmarkSubStrRunes(b *testing.B) {
for i := 0; i < b.N; i++ {
SubStrRunes(benchmarkSubString, benchmarkSubStringLength)
}
}

我得到了让我有些吃惊的结果:

goos: darwin
goarch: amd64
pkg: github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark
BenchmarkSubStrRunes-8 872253 1363 ns/op 336 B/op 2 allocs/op
PASS
ok github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark 2.120s

对 69 个的字符串截取前 20 个字符需要大概 1.3 微秒,这极大的超出了我的心里预期,我发现因为类型转换带来了内存分配,这产生了一个新的字符串,并且类型转换需要大量的计算。

救命稻草 - utf8.DecodeRuneInString

我想改善类型转换带来的额外运算和内存分配,我仔细的梳理了一遍 strings 包,发现并没有相关的工具,这时我想到了 utf8 包,它提供了多字节计算相关的工具,实话说我对它并不熟悉,或者说没有主动(直接)使用过它,我查看了它所有的文档发现 utf8.DecodeRuneInString 函数可以转换单个字符,并给出字符占用字节的数量,我尝试了如此下的实验:

package benchmark

import (
"testing"
"unicode/utf8"
)

var benchmarkSubString = "Go语言是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。为了方便搜索和识别,有时会将其称为Golang。"
var benchmarkSubStringLength = 20

func SubStrDecodeRuneInString(s string, length int) string {
var size, n int
for i := 0; i < length && n < len(s); i++ {
_, size = utf8.DecodeRuneInString(s[n:])
n += size
}

return s[:n]
}

func BenchmarkSubStrDecodeRuneInString(b *testing.B) {
for i := 0; i < b.N; i++ {
SubStrDecodeRuneInString(benchmarkSubString, benchmarkSubStringLength)
}
}

运行它之后我得到了令我惊喜的结果:

goos: darwin
goarch: amd64
pkg: github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark
BenchmarkSubStrDecodeRuneInString-8 10774401 105 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark 1.250s

较 []rune 类型转换效率提升了 13倍,消除了内存分配,它的确令人激动和兴奋,我迫不及待的回复了 “hollowaykeanho” 告诉他我发现了一个更好的方法,并提供了相关的性能测试。

我有些小激动,兴奋的浏览着论坛里各种有趣的问题,在查看一个问题的帮助时 (忘记是哪个问题了-_-||) ,我惊奇的发现了另一个思路。

良药不一定苦 - range 字符串迭代

许多人似乎遗忘了 range 是按字符迭代的,并非字节。使用 range 迭代字符串时返回字符起始索引和对应的字符,我立刻尝试利用这个特性编写了如下用例:

package benchmark

import (
"testing"
)

var benchmarkSubString = "Go语言是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。为了方便搜索和识别,有时会将其称为Golang。"
var benchmarkSubStringLength = 20

func SubStrRange(s string, length int) string {
var n, i int
for i = range s {
if n == length {
break
}

n++
}

return s[:i]
}

func BenchmarkSubStrRange(b *testing.B) {
for i := 0; i < b.N; i++ {
SubStrRange(benchmarkSubString, benchmarkSubStringLength)
}
}

我尝试运行它,这似乎有着无穷的魔力,结果并没有令我失望。

goos: darwin
goarch: amd64
pkg: github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark
BenchmarkSubStrRange-8 12354991 91.3 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark 1.233s

它仅仅提升了13%,但它足够的简单和易于理解,这似乎就是我苦苦寻找的那味良药。

如果你以为这就结束了,不、这对我来只是探索的开始。

终极时刻 - 自己造轮子

喝了 range 那碗甜的腻人的良药,我似乎冷静下来了,我需要造一个轮子,它需要更易用,更高效。

于是乎我仔细观察了两个优化方案,它们似乎都是为了查找截取指定长度字符的索引位置,如果我可以提供一个这样的方法,是否就可以提供用户一个简单的截取实现 s[:strIndex(20)] ,这个想法萌芽之后我就无法再度摆脱,我苦苦思索两天来如何来提供易于使用的接口。

之后我创造了 exutf8.RuneIndexInString 和 exutf8.RuneIndex 方法,分别用来计算字符串和字节切片中指定字符数量结束的索引位置。

我用 exutf8.RuneIndexInString 实现了一个字符串截取测试:

package benchmark

import (
"testing"
"unicode/utf8"

"github.com/thinkeridea/go-extend/exunicode/exutf8"
)

var benchmarkSubString = "Go语言是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。为了方便搜索和识别,有时会将其称为Golang。"
var benchmarkSubStringLength = 20

func SubStrRuneIndexInString(s string, length int) string {
n, _ := exutf8.RuneIndexInString(s, length)
return s[:n]
}

func BenchmarkSubStrRuneIndexInString(b *testing.B) {
for i := 0; i < b.N; i++ {
SubStrRuneIndexInString(benchmarkSubString, benchmarkSubStringLength)
}
}

尝试运行它,我对结果感到十分欣慰:

goos: darwin
goarch: amd64
pkg: github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark
BenchmarkSubStrRuneIndexInString-8 13546849 82.4 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark 1.213s

性能较 range 提升了 10%,让我很欣慰可以再次获得新的提升,这证明它是有效的。

它足够的高效,但是却不够易用,我截取字符串需要两行代码,如果我想截取 10~20之间的字符就需要4行代码,这并不是用户易于使用的接口,我参考了其它语言的 sub_string 方法,我想我应该也设计一个这个样的接口给用户。

exutf8.RuneSubString 和 exutf8.RuneSub 是我认真思索后编写的方法:

func RuneSubString(s string, start, length int) string

它有三个参数:

  • s : 输入的字符串
  • start : 开始截取的位置,如果 start 是非负数,返回的字符串将从 string 的 start 位置开始,从 0 开始计算。例如,在字符串 “abcdef” 中,在位置 0 的字符是 “a”,位置 2 的字符串是 “c” 等等。 如果 start 是负数,返回的字符串将从 string 结尾处向前数第 start 个字符开始。 如果 string 的长度小于 start,将返回空字符串。
  • length:截取的长度,如果提供了正数的 length,返回的字符串将从 start 处开始最多包括 length 个字符(取决于 string 的长度)。 如果提供了负数的 length,那么 string 末尾处的 length 个字符将会被省略(若 start 是负数则从字符串尾部算起)。如果 start 不在这段文本中,那么将返回空字符串。 如果提供了值为 0 的 length,返回的子字符串将从 start 位置开始直到字符串结尾。

我为他们提供了别名,根据使用习惯大家更倾向去 strings 包寻找这类问题的解决方法,我创建了exstrings.SubString 和 exbytes.Sub 作为更易检索到的别名方法。

最后我需要再做一个性能测试,确保它的性能:

package benchmark

import (
"testing"

"github.com/thinkeridea/go-extend/exunicode/exutf8"
)

var benchmarkSubString = "Go语言是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。为了方便搜索和识别,有时会将其称为Golang。"
var benchmarkSubStringLength = 20

func SubStrRuneSubString(s string, length int) string {
return exutf8.RuneSubString(s, 0, length)
}

func BenchmarkSubStrRuneSubString(b *testing.B) {
for i := 0; i < b.N; i++ {
SubStrRuneSubString(benchmarkSubString, benchmarkSubStringLength)
}
}

运行它,不会让我失望:

goos: darwin
goarch: amd64
pkg: github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark
BenchmarkSubStrRuneSubString-8 13309082 83.9 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/thinkeridea/go-extend/exunicode/exutf8/benchmark 1.215s

虽然相较 exutf8.RuneIndexInString 有所下降,但它提供了易于交互和使用的接口,我认为这应该是最实用的方案,如果你追求极致仍然可以使用 exutf8.RuneIndexInString,它依然是最快的方案。

总结

当看到有疑问的代码,即使它十分的简单,依然值得深究,并不停的探索它,这并不枯燥和乏味,反而会有极多收获。

从起初 []rune 类型转换到最后自己造轮子,不仅得到了16倍的性能提升,我还学习了utf8包、加深了range 遍历字符串的特性 以及为 go-extend 仓库收录了多个实用高效的解决方案,让更多 go-extend 的用户得到成果。

go-extend 是一个收录实用、高效方法的仓库,读者们如果好的函数和通用高效的解决方案,期待你们不吝啬给我发送 Pull request,你也可以使用这个仓库加快功能实现及提升性能。

转载:

本文作者: 戚银(thinkeridea

本文链接: https://blog.thinkeridea.com/201910/go/efficient_string_truncation.html

版权声明: 本博客所有文章除特别声明外,均采用 CC BY 4.0 CN协议 许可协议。转载请注明出处!

DevOps 从理论到实践指南

CORNERSTONE 发表了文章 • 0 个评论 • 514 次浏览 • 2019-10-30 15:40 • 来自相关话题

什么是 DevOps ...查看全部
什么是 DevOps


如今 DevOps 已经成为一个流行词,很多公司都在说自己在做 DevOps,但是每个人、每家公司理解的 DevOps 又不尽相同,从 DevOps 诞生的第一天起,如何定义 DevOps 就是一个争论不休的话题。
这篇文章,CORNERSTONE认为基本诠释了 DevOps 的定义:DevOps 是什么不是什么
如果你没有耐心把这篇文章看完,维基百科还给出了一个太长不读版:
DevOps (a clipped compound of “development” and “operations”) is a software development and delivery process that emphasizes communication and collaboration between product management, software development, and operations professionals.It seeks to automate the process of software integration, testing, deployment, and infrastructure changes by establishing a culture and environment where building, testing, and releasing software can happen rapidly, frequently, and more reliably.
归纳成三点:
  • DevOps 是一种强调沟通与协作的软件交付过程。它包括产品管理,软件开发及运营等各个方面。
  • DevOps 自动化软件集成,测试,部署以及基础设施的变更。
  • 它的目标是建立一种文化和环境,使得软件的构建、测试、交付更快,更频繁,更可靠。

DevOps 的由来
为什么要实践 DevOps
  • 更短的交付周期,生产环境部署频率越来越快,简化生产部署流程,且自动化不停机部署
  • 更高的价值,形成特性提出到运营数据、用户反馈验证的实验性交付闭环,基于实际用户反馈调整计划和需求
  • 更好的质量保障,在代码检查,功能和非功能验证,以及部署各方面建立较完善的质量保障体系,尤其是自动化测试集
  • 更高绩效的团队,包含业务,开发测试,和运维职能在内的一体化团队,以产品交付为共同目标紧密协作,共同承担责任

DevOps 在技术领域的实践
DevOps运作包括文化(全功能,自运维)和技术(自动化,度量反馈)两方面,而技术能力的改进主要关注以下六个领域:
内建质量体系
通过持续代码评审,静态分析,自动化测试,自动部署验证等手段构成一套有效的质量保障体系。
主要实践包括:
  • TDD:测试驱动开发的思想,保证代码质量和不偏离业务需求的技术实现
  • 结对编程和代码审查,依靠团队的自治性让团队成员互相监督和审查代码质量
  • 自动化测试,高自动化,且高频率运行的测试,保证测试用例质量的同时保证了交付软件的质量

持续部署


CORNERSTONE通过自动化的构建,部署过程快速频繁地将软件交付给用户,提高吞吐量;同时保障过程的安全,平滑,可视。
主要实践包括:
  • 在已经做到持续集成的情况下,引入持续部署,每次提交均会出发构建并执行部署
  • 蓝绿部署,用于实现零宕机发布新版本
  • 金丝雀发布,用于使应用发布流程具备快速试错的能力

持续监控


CORNERSTONE持续对运行环境在系统,应用层面进行监控,及时发现风险或问题,保障系统运行的稳定性。
主要实践包括:
  • 监控预警,在项目开始初期就引入监控,让整个团队实时能够收到关于产品各个维度数据的反馈
  • 日志聚合,便于错误追踪和展示
  • 分析,利用搜集到的数据实时分析,利用分析结果指导开发进度

度量与反馈


CORNERSTONE通过对用户行为或业务指标的度量或反馈收集,为产品的决策提供依据。
主要实践包括:
  • 持续集成反馈,对代码构建质量,代码质量审查的反馈
  • 测试反馈,对软件质量,功能性的测试,给到业务的反馈
  • 运营数据反馈,新功能上线后对业务影响的反馈,用于指导业务人员提新的需求

环境管理


CORNERSTONE通过对服务器环境的定义,自动化建立和配置、更新等提高基础设施管理的效率,一致性,并更有效利用资源,可伸缩的架构,保证服务的健壮性。
主要实践包括:
  • 弹性架构,保证服务的吞吐量和具备灵活变更的能力
  • 自动化部署脚本,想胶水一样,用于解决一些工程实践不够完善的流程之间的衔接
  • 基础设施即代码,用代码定义基础设施,便于环境管理,追踪变更,以及保证环境一致性

松耦合架构
对传统应用架构进行领域组件化,服务化,提升可测试性和可部署性。
主要实践包括:
  • 采用弹性基础设施,比如公有云服务或是 PaaS(Platform as a Service) 平台
  • 构建为服务应用
  • 引入契约测试


典型DevOps的持续交付流水线全景图
软件开发全生命周期的持续优化

抽空搭建了个国内版—The Go Playground

yeyuqiu 发表了文章 • 2 个评论 • 705 次浏览 • 2019-10-26 14:56 • 来自相关话题

1. 基于 go 最新版本 1.13.32. tab 缩进改为 4 个字符宽度访问地址为 https://play.yeyuqiu.com ...查看全部

1. 基于 go 最新版本 1.13.3
2. tab 缩进改为 4 个字符宽度

访问地址为 https://play.yeyuqiu.com

一秒让你读懂 DevOps 的本质及行业现状与趋势

CORNERSTONE 发表了文章 • 0 个评论 • 766 次浏览 • 2019-10-25 16:57 • 来自相关话题

手工编译,上传服务器文件,执行各种命令,启动和停止服务器,发现一个 BUG,改完再重复之前说的步骤...... 如今看起来啼笑皆非的重复劳动,在没有 DevOps 概念之前,全靠 IT 人员手工完成。不仅如此,DevOps 概念的提出,最初 ...查看全部

手工编译,上传服务器文件,执行各种命令,启动和停止服务器,发现一个 BUG,改完再重复之前说的步骤...... 如今看起来啼笑皆非的重复劳动,在没有 DevOps 概念之前,全靠 IT 人员手工完成。

不仅如此,DevOps 概念的提出,最初因传统模式运维之痛而生。众所周知,DevOps 一词的来自于 Development 和 Operations 的组合。字如其意,DevOps 就是让软件开发人员和运维人员更好的沟通合作,通过自动化流程来使得软件构建、测试、发布更加快捷、频繁和可靠。

image.png

《RightScale2018 年度云计算调查》报告数据显示,DevOps 的整体采用率从 74% 上升到了 78%,同时企业的 DevOps 采用率达到了 84%。30% 的企业正在全公司范围内采用 DevOps,同比增长 9%。

这些数字的提升不仅代表了 DevOps 可以带来实际效果的共识,也印证了 DevOps 这两年开始受到越来越多的企业重视。DevOps 概念的深入人心,离不开云计算、容器 /Docker、微服务、敏捷等相关概念和实施的成熟发展。其背后本质是企业 IT 的精益运营,以面对更快的业务试错与业务创新。


概念虽好,落地却难?

DevOps 的一个巨大好处就是可以高效交付。DevOps 理念指向“高度的自动化”,试图制定一条从开发到运行自动运行的流水线,最大程度地摆脱人工的束缚,达到企业生产力的升级。

比如,持续的集成与开发,实现从开发测试、上线运维的一体化自动流程;智能预警帮助用户自动监控集群运行状态,快速定位到问题具体发生的位置,及时通知用户以快速解决问题等。

DevOps 另外一个好处就是会改善公司组织文化、提高员工的参与感。员工们变得更高效,也更有满足和成就感。

由此可见,如果能采用 DevOps,公司就能够做更多的创新,缩短开发周期,将产品更加快速地推向市场;同时创造差别化的公司业务和价值,提高组织效率,而不是不停地修补旧问题。


技术分享

虽说 DevOps 优势多多,前途大好,企业对 DevOps 的评价也很高,但实际情况却是说的人多,做到的很少。

究其原因,在于 DevOps 并不是简单地将开发部门和运维部门合并,更是企业文化、组织结构的变革,是通过自动化的基础设施、合理的流程规范以及智能的自动运行系统测试来加强开发部门和运维部门之间的协作和沟通。

首先,这涉及到观念问题。DevOps 的最终目的是加强开发部门和运维部门之间的协作和沟通,如何把现在的文化朝 DevOps 思维模式扭转,并且在开发和运维之间找到共识?

对于传统企业,特别是大型分布式组织,在整体意义上的 DevOps 成功往往是不可能实现的。因为 DevOps 要求深层次的文化和组织变革,要改变的太多太多。这意味着大家要扔掉奉行了几十年的显规则和潜规则。你不得不告诉老部下们,大部分他们知道的和每天做的事物都已经过时了。

其次,想要为 DevOps 和应用灵活性而重塑团队,就要有打破 IT 分组壁垒的勇气,并且需要在团队成员筛选上做出艰难的决定。


困难重重,如何实现?

毫无疑问,DevOps 对组织是非常有价值的,但是需要注意的是,整个公司都需要参与到 DevOps 里才能成功。DevOps 需要高级领导层的支持,也需要和最终产品相关的所有人的参与,而不仅仅是开发和运维部门。

第一,在开发和运维之间找到共识,这需要强大的领导力来实现变革。当然,它也需要花费时间和金钱,并且需要在团队成员筛选上做出艰难的决定。

可以从小处开始,不要期望一开始就能让所有人都信服 DevOps。实际中,在特定项目的小型组织内赢得大家的支持,就赢得了会在公司其他地方帮助宣传 DevOps 的大使们,这会带来乘数效应。

第二,围绕业务系统而不是职责来组织工作,这就是 DevOps 打破 IT 分组壁垒的寓意。一个团队应该有开发人员创建代码,从用户界面到业务逻辑和数据结构,也应该有运维人员负责操作自动化和部署。团队待在一起,共同为他们的应用和系统负责。

当然,为了促进 DevOps 战略,调整考核和激励机制是必要的。应该奖励系统创建和运维的整体团队,并且根据团队工作的全部要素来确定奖励。

第三,团队需要选择最合适的工具。虽然 DevOps 是一个概念,但工具是实现 DevOps 的重要组成部分。近几年来如日中天的 CORNERSTONE 就是实现 DevOps 最合适的工具之一。

在不了解DevOps生命周期的情况下,对DevOps的理解也会片面化。我们以CORNERSTOENE一站式云端 DevOps平台为例让我们一起探讨DevOps的生命周期。


CORNERSTONE | DevOps全流程解决方案


一、持续开发:


这是DevOps生命周期中软件不断开发的阶段。与瀑布模型不同的是,软件可交付成果被分解为短开发周期的多个任务节点,在很短的时间内开发并交付。


image.png


CORNERSTONE任务模块里,任务分配给谁,就会是谁的责任。而且任务支持多责任人与子任务关联,所有的任务和状态都会体现在任务模块里,这个任务过程中的参与人员随时知道任务的状态和目前碰到的问题,可有效推进工作的解决。有了CORNERSTONE能够帮助我们追溯和监控,促进对于接任务的人员有效负起责任,并能够及时同步到信息。


二、持续测试:


在这个阶段,开发的软件将被持续地测试bug。CORNERSTONE平台覆盖完整的测试流程,可进行测试用例的编写,建立用例库,减少重复性操作,让研发团队的协作更高效,产品交付更快速。常用的两个功能为:


1)测试用例管理


通过编写测试⽤例,制定测试计划并执⾏,测试结果可直接关联到缺陷,方便对问题进行跟踪处理,实现对迭代质量的全程把控。


 Clipboard Image.png


2)缺陷管理


强大的缺陷管理与统计功能,通过分组、解决状态、优先级等列表对缺陷进行全方位记录与跟踪,同时明确缺陷责任人,及时跟进解决缺陷;同时支持导入导出功能,导出时支持任意格式,不受模板限制。


Clipboard Image.png


三、持续集成:


这是支持新功能的代码与现有代码集成的阶段。由于软件在不断地开发,更新后的代码需要不断地集成,并顺利地与系统集成,以反映对最终用户的需求更改。更改后的代码,还应该确保运行时环境中没有错误,允许我们测试更改并检查它如何与其他更改发生反应。


image.png


CORNERSTONE⽀持将持续集成的结果部署到对应的测试环境,所有部署版本在测试环境中可随时访 问,⽀持灰度发布到⽣产环境中。


四、持续部署:


它是将代码部署到生产环境的阶段。在这里,我们确保在所有服务器上正确部署代码。如果添加了任何功能或引入了新功能,那么应该准备好迎接更多的网站流量。因此,系统运维人员还有责任扩展服务器以容纳更多用户。


Clipboard Image.png


CORNERSTONE支持依赖脚本pipeline实现的DevOps,支持持续集成与自动化部署,可直接在可视化的服务器上进行操作,同时满足多种开发语言,彻底解决敏捷开发在运维层面的瓶颈,方便开发人员对项目开发生命周期进行全盘管理。


五、持续监控:


这是DevOps生命周期中非常关键的阶段,旨在通过监控软件的性能来提高软件的质量。这种做法涉及运营团队的参与,他们将监视用户活动中的错误/系统的任何不正当行为。这也可以通过使用专用监控工具来实现,该工具将持续监控应用程序性能并突出问题。


Clipboard Image.png


  CORNERSTONE嵌⼊一体化监控运维平台,实现IT环境的数字化、标准化,直接运维分析的基础,减少    人工干预,降低⼈工成本。

最后,好工具得有能人掌控才能发挥其威力。即使找到了好用的工具,也需要有熟悉这个工具链,拥有相应技能的 IT 人员来提供技术支持,才能完成实现自动化的使命。



写在最后

总体而言,DevOps 作为一种理念,推动开发和运维之间的合作,有效回应了当前的商业需求。它的实现是新观念、新工具、新技能的三者叠加。虽然实现起来还有种种问题,但 DevOps 是大势所趋,作为企业的高层和开发运维人员,对这样的变革不能视而不见。

CORNERSTONE | DevOps平台是如何实现开发效率的双倍提升?

CORNERSTONE 发表了文章 • 0 个评论 • 700 次浏览 • 2019-10-23 17:03 • 来自相关话题

随着企业业务对软件系统日益依赖,IT管理与研发模式也随之对“敏捷”模式产生了需求,也就是今天人们时常提起的DevOps。提升效率,是DevOps实践的核心内容之一。就让我们来一起从软件生命周期的业务流与工作流,探讨DevOps实践效率提升的方向与方法吧。 ...查看全部

随着企业业务对软件系统日益依赖,IT管理与研发模式也随之对“敏捷”模式产生了需求,也就是今天人们时常提起的DevOps。提升效率,是DevOps实践的核心内容之一。就让我们来一起从软件生命周期的业务流与工作流,探讨DevOps实践效率提升的方向与方法吧。


一、CORNERSTONE | DevOps之“流”分析

软件工程将软件的生命周期定义为问题定义、需求分析、软件设计、程序编码、软件测试、运行维护等过程,无论是对于传统模式、敏捷模式还是DevOps模式,软件生命周期过程基本一致,如下图所示。
image.png


软件生命周期各个过程也组成了软件工程的“业务流”,而在不同团队采用相应地开发模式中,具体执行的开发及相关的活动,我们则成为工作流”。


CORNERSTONE,DevOps实践中,最主要改进的内容,就是对于这些 “工作流”的活动进行“关停并转”,从而实现整体与局部上对于效率的提升。


这些工作,也就是需要开展的活动,可以分为以下几类:


人与人的互动

这类活动交互的双方均为自然人,如业务需求收集,活动的特点是具备高度的不规则与不规律性。


人与机的互动

这类活动交互的一方为自然人,一方为依托于计算机的程序,如编码活动、人工审核/审批等,活动的特点是人的活动必须依循计算机相关主题的规则,部分活动可以抽取为规范化的过程。


机与机的互动

这类活动的特点是交互的双方都是依托于计算机的程序,如编译构建、自动化测试,活动的过程高度规范化。不同的作业类型,在效率提升的优化中,需要采用的方法各有不同。


二、CORNERSTONE | DevOps效率提升之协作

协作的本质是在不同的主体之间进行快速、有效的信息共享,从而进一步协调各主体进行步调一致、有序的工作执行,实现整体上的一致性与顺畅性,协作是DevOps实践中效率提升的重要方向和内容之一。

DevOps实践中的协作更多需要是从软件生命周期整体系统化考虑与设计,协作设计上面主要包括以下两个方面。

01、信息共享

传统的模式中,相关业务信息仅共享于各阶段内部,而在CORNERSTONE中,则更强调信息的跨阶段共享,面向产品的全生命周期,共享信息包括:

业务类信息
即业务目标、业务背景、业务需求、业务限制等信息。

执行类信息
即软件开发、编译、测试、部署等执行的相关信息,如开始时间、结束时间、执行时长、执行操作记录等。

反馈类信息
即各步骤、阶段执行的信息反馈,如需求拆分反馈、任务执行反馈、代码编译结果、测试结果、发布验证结果等。

CORNERSTONE为以上信息提供统一的信息管理与分析平台。对于代码编写之前的阶段提供如敏捷协同的工作协同管理模块,以记录需求、任务分配、需求完成进展等信息,对于代码编写之后的阶段,则提供相对完整的执行记录信息以及必要的通知信息,以构建及时的反馈。

02、协作调度

协作调度是DevOps协作实践中另外一项关键内容。通过CORNERSTONE平台,可实现对于“机与机的活动”全自动协作调度,对于“人与机的活动”简化协作调度,对于“人与人的活动”事件驱动协作调度,进而实现优化协作调度的效率,提升协作效果。

全自动协作调度
全自动的协作调度主要是通过CORNERSTONE平台的流水线引擎实现,通过流水线编排的实现指定作业流自动执行,执行过程中自动完成不同阶段的信息交互,过程无需人工参与。

简化的协作调度
简化的协作调度也是通过CORNERSTONE平台的流水线引擎实现,在流水线作业流中编排需要人工干预的节点,但仅需要人工给出通过/终止等简单的指令型信息即可。

基于事件的协作调度
基于事件驱动的协作调度,主要是用于“人与人的活动”,也可以用于“人与机的活动”,其通过通知、待办等事件方式,实现精准的信息共享与推送,驱动协作的下游方快速接受和推进事务工作。

CORNERSTONE中的协作调度的效果可以通过研发效能来进行初步的评估与衡量,通过衡量,我们可以较为清晰的获知哪个阶段的协调调度是关键阻碍点或可以进一步优化。

三、CORNERSTONE | DevOps效率提升之自动化

自动化是DevOps的核心理念,也是效率提升的最重要手段。通过CORNERSTONE一站式云端DevOps平台,实现软件过程自动化以及软件过程的支撑工作自动化。

CORNERSTONE | DevOps全流程解决方案

01、软件过程自动化

软件过程自动化是指在软件的开发、测试、部署等过程中,引入自动化的手段,从而实现快速的软件质量检查,以及软件应用发布。


开发过程自动化


CORNERSTONE的代码助手可帮助编程人员以最快的速度完成编程工作,比如当需要对外部的某个窗口进行操作时,CORNERSTONE的代码助手可进行探测,获取相关的窗口信息,再对其它进行操作等。

image.png


测试过程自动化

CORNERSTONE平台覆盖完整的测试流程,可进行测试用例的编写,建立用例库,减少重复性操作,让研发团队的协作更高效,产品交付更快速。常用的两个功能为:


1)测试用例管理


通过编写测试⽤例,制定测试计划并执⾏,测试结果可直接关联到缺陷,方便对问题进行跟踪处理,实现对迭代质量的全程把控。


 Clipboard Image.png


2)缺陷管理


强大的缺陷管理与统计功能,通过分组、解决状态、优先级等列表对缺陷进行全方位记录与跟踪,同时明确缺陷责任人,及时跟进解决缺陷;同时支持导入导出功能,导出时支持任意格式,不受模板限制。


Clipboard Image.png


部署过程自动化


CORNERSTONE支持依赖脚本pipeline实现的DevOps,支持持续集成与自动化部署,可直接在可视化的服务器上进行操作,同时满足多种开发语言,彻底解决敏捷开发在运维层面的瓶颈,方便开发人员对项目开发生命周期进行全盘管理。


Clipboard Image.png


通过流水线引擎,实现以上内容的自由、可视化编排,以及按需执行。


02、过程支撑自动化

软件过程支撑主要是指面向软件工程过程的支撑,实现自动化包括:


编译构建环境自动化

编译构建环境包括基于DevOps平台的自管理编译构建环境,按需生成编译构建环境,编译构建完成后自动销毁,以及特定编译构建环境的快速接入等。


测试环境自动化

测试环境自动化是指自动化测试执行所需的能力环境,如接口/UI测试脚本所需的执行环境,可以根据测试任务的需要,实现测试环境的弹性伸缩自管理。


环境部署自动化

环境部署自动化是指对于开发、测试、生产等所需要的基础环境,可以根据流水线自动完成环境的使用前的生成、使用后的回收等,实现资源即代码,无需人工参与。


CORNERSTONE中,通过大量的过程及支撑自动化,可以极大的减少开发、测试、运维等工作的人工参与时间,降低人工成本,并能实现人工无法完成的工作,例如快速对10000台服务器上的应用进行更新。但前期的建设需要涉及的技术点较多,成本也较为巨大,如何建设落地自动化,除了考虑效率之外,还需着重考虑业务平台的自主可控与可持续发展等方面。


四、CORNERSTONE | DevOps效率提升之持续优化

持续优化,是CORNERSTONE效率提升的第三个主要方面,也是践行DevOps理念的重要实践。持续优化需要解决优化什么、如何优化等问题。这些问题的解决,需要应用DevOps精益分析的理念实践。精益分析,本质就是对数据的统计、分析与挖掘。


01、数据获取

精益分析所涉及的数据应从需求提出到用户访问形成一个端到端闭环。数据的获取需要从业务系统本身以及支撑业务系统的CORNERSTONE平台两个方向获取。早期可以以CORNERSTONE平台相关数据的获取为主要来源,后续可持续集成来自业务系统埋点获取的数据。在整个过程中,需要做到数据的及时性、准确性与完整性。

02、数据分析

数据分析需要有明确的目标和针对性,如针对业务需求提出到上线的平均周期、开发返工趋势等,通过数据分析,可以快速找到当前影响效率的关键点,从而实现针对性的改善。

image.png

03、数据呈现

数据呈现即为数据应用,数据呈现可以采用两种方式进行。

协同管理
将数据获取/分析的结果,在CORNERSTONE的协同管理平台实时的反馈和呈现,从而推动PO/开发团队/干系人等根据反馈信息快速推进效率优化,通过量变引发质变,通过团队内自我优化的方式实现效率的提升。

度量分析
针对于与效率相关的重点指标,通过可视化图表等方式,进行专项的度量分析,并在管理与项目团队共享指标信息以及指标的变化趋势,通过全局监督的方式推进效率的提升。

五、结论

文化上的协同打破了流程与部门的屏障,共享了信息,协作了调度;过程中的自动化消除了重复性的工作,降低人为风险;业务系统与CORNERSTONE平台的数据支持精准提供优化的方向。DevOps之所以能为企业提升效率在于DevOps的实践实现软件生命周期的业务流与作业流的一致与顺畅。

写了一个100%Go语言的Web-Term-SSH 堡垒机项目欢迎大家拍砖

回复

jjjjerk 发起了问题 • 1 人关注 • 0 个回复 • 288 次浏览 • 2019-10-22 15:53 • 来自相关话题

Go语言自定义自己的SSH-Server

jjjjerk 发表了文章 • 0 个评论 • 816 次浏览 • 2019-10-22 15:31 • 来自相关话题

原文地址  ...查看全部

Golang-Docker ChromeDP浏览器模拟和截图微服务

jjjjerk 发表了文章 • 0 个评论 • 885 次浏览 • 2019-10-21 15:58 • 来自相关话题

原文地址  ...查看全部

用 Go 語言實作 Job Queue 機制

appleboy 发表了文章 • 1 个评论 • 940 次浏览 • 2019-10-20 09:03 • 来自相关话题

很高興可以在  ...查看全部

很高興可以在 Mopcon 分享『用 Go 語言實現 Job Queue 機制』,透過簡單的 goroutine  channel 就可以實現簡單 Queue 機制,並且限制同時可以執行多少個 Job,才不會讓系統超載。最後透過編譯放進 Docker 容器內,就可以跑在各種環境上,加速客戶安裝及部署。

議程大綱

本次大致上整理底下幾個重點:

  1. What is the different unbuffered and buffered channel?
  2. How to implement a job queue in golang?
  3. How to stop the worker in a container?
  4. Shutdown with Sigterm Handling.
  5. Canceling Workers without Context.
  6. Graceful shutdown with worker.
  7. How to auto-scaling build agent?
  8. How to cancel the current Job?

由於在投影片內也許寫得不夠詳細,所以我打算錄製一份影片放在 Udemy 教學影片上,如果有興趣可以參考底下影片連結:

之前的教學影片也可以直接參考底下連結:

投影片

https://www.slideshare.net/appleboy/job-queue-in-golang-184064840

袋鼠存储 v1.3 正式支持移动端

fabsnail 发表了文章 • 0 个评论 • 900 次浏览 • 2019-10-19 17:39 • 来自相关话题

自袋鼠存储(roostore.com)发布以来,感谢各位的认可,下载使用和积极反馈袋鼠存储致力于为用户提供一种新的存储解决方案,使得用户对 ...查看全部

自袋鼠存储(roostore.com)发布以来,感谢各位的认可,下载使用和积极反馈

袋鼠存储致力于为用户提供一种新的存储解决方案,使得用户对服务可控,对数据可控还有对成本可控

现正式推出 v1.3 版本,该版本包含移动端安卓版,更进一步方便用户使用:

kstore

1. 支持嵌入式提供服务,为移动端提供集成使用
2. 支持跨节点在线查看文件,为移动端提供在线查看图片,播放视频等
3. 提供注册登陆口令控制项,确保公网中的私人节点只允许有口令的用户使用
4. 修复浏览文件夹时滚动条可能不显示问题
5. 修复某些浏览器中点击下载文件无响应问题
6. 修复协程上下文资源未及时释放问题
7. 修复自动登陆或浏览文件夹时可能出现的阻塞问题
8. 修复长时间运行过程中周期性查找更多可用连接节点时可能出现的并发崩溃问题


移动端(安卓版首发)
1. 支持在指定的服务器上进行注册,登陆
2. 支持使用集成服务,满足无自部署 kstore 服务时使用
3. 提供断点上传手机中的文件,支持暂停、恢复,取消和清空(已完成)任务
4. 提供断点下载文件到手机,支持暂停,恢复,取消和清空(已完成)任务
5. 提供将下载的文件分享发送给其他应用
6. 提供文件浏览,创建删除文件(夹)
7. 提供文件剪切,复制,粘贴,重命名以及查看文件详细信息等
8. 支持下拉刷新文件列表
9. 支持查看图片(jpg, jpeg, png, gif, svg等格式),包括查看其他 kstore 节点中的图片(不存储文件到本地)
10. 支持播放视频(mp4, mov),包括查看其他 kstore 节点中的视频(不存储文件到本地)
11. 支持 admin 后台管理(目前仅支持网络管理部分)
12. 支持是否显示目录大小选项设置
13. 提供帮助与反馈渠道
14. 提供将 kstore-app 推荐给好友


感兴趣的朋友,欢迎通过官网 roostore.com 下载使用

注:iPhone 版本正在努力攻关一些关键技术问题,相信很快也会与大家见面

go 学习笔记之解读什么是defer延迟函数

snowdreams1006 发表了文章 • 0 个评论 • 835 次浏览 • 2019-10-18 19:23 • 来自相关话题

Go 语言中有个 defer 关键字,常用于实现延迟函数来保证关键代码的最终执行,常言道: "未雨绸缪方可有备无患".


延迟函数就是这么一种机制,无论程序是正常返回还是异常报错,只要存在延迟函数都能保证这部分关键逻辑最终执行,所以用来做些资源清理等操作再合适不过了.


go-error-about-defer.jpg
go-error-about-defer.jpg

出入成双有始有终


日常开发编程中,有些操作总是成双成对出现的,有开始就有结束,有打开就要关闭,还有一些连续依赖关系等等.


一般来说,我们需要控制结束语句,在合适的位置和时机控制结束语句,手动保证整个程序有始有终,不遗漏清理收尾操作.


最常见的拷贝文件操作大致流程如下:



  1. 打开源文件


srcFile, err := os.Open("fib.txt")
if err != nil {
t.Error(err)
return
}


  1. 创建目标文件


dstFile, err := os.Create("fib.txt.bak")
if err != nil {
t.Error(err)
return
}


  1. 拷贝源文件到目标文件


io.Copy(dstFile, srcFile)


  1. 关闭目标文件


dstFile.Close()
srcFile.Close()


  1. 关闭源文件


srcFile.Close()

值得注意的是: 这种拷贝文件的操作需要特别注意操作顺序而且也不要忘记释放资源,比如先打开再关闭等等!


func TestCopyFileWithoutDefer(t *testing.T) {
srcFile, err := os.Open("fib.txt")
if err != nil {
t.Error(err)
return
}

dstFile, err := os.Create("fib.txt.bak")
if err != nil {
t.Error(err)
return
}

io.Copy(dstFile, srcFile)

dstFile.Close()
srcFile.Close()
}


「雪之梦技术驿站」: 上述代码逻辑还是清晰简单的,可能不会忘记释放资源也能保证操作顺序,但是如果逻辑代码比较复杂的情况,这时候就有一定的实现难度了!



可能是为了简化类似代码的逻辑,Go 语言引入了 defer 关键字,创造了"延迟函数"的概念.



  • defer 的文件拷贝


func TestCopyFileWithoutDefer(t *testing.T) {
if srcFile, err := os.Open("fib.txt"); err != nil {
t.Error(err)
return
} else {
if dstFile,err := os.Create("fib.txt.bak");err != nil{
t.Error(err)
return
}else{
io.Copy(dstFile,srcFile)

dstFile.Close()
srcFile.Close()
}
}
}


  • defer 的文件拷贝


func TestCopyFileWithDefer(t *testing.T) {
if srcFile, err := os.Open("fib.txt"); err != nil {
t.Error(err)
return
} else {
defer srcFile.Close()

if dstFile, err := os.Create("fib.txt.bak"); err != nil {
t.Error(err)
return
} else {
defer dstFile.Close()

io.Copy(dstFile, srcFile)
}
}
}

上述示例代码简单展示了 defer 关键字的基本使用方式,显著的好处在于 Open/Close 是一对操作,不会因为写到最后而忘记 Close 操作,而且连续依赖时也能正常保证延迟时机.


简而言之,如果函数内部存在连续依赖关系,也就是说创建顺序是 A->B->C 而销毁顺序是 C->B->A.这时候使用 defer 关键字最合适不过.


懒人福音延迟函数



官方文档相关表述见 Defer statements[1]



如果没有 defer 延迟函数前,普通函数正常运行:


func TestFuncWithoutDefer(t *testing.T) {
// 「雪之梦技术驿站」: 正常顺序
t.Log("「雪之梦技术驿站」: 正常顺序")

// 1 2
t.Log(1)
t.Log(2)
}

当添加 defer 关键字实现延迟后,原来的 1 被推迟到 2 后面而不是之前的 1 2 顺序.


func TestFuncWithDefer(t *testing.T) {
// 「雪之梦技术驿站」: 正常顺序执行完毕后才执行 defer 代码
t.Log(" 「雪之梦技术驿站」: 正常顺序执行完毕后才执行 defer 代码")

// 2 1
defer t.Log(1)
t.Log(2)
}

如果存在多个 defer 关键字,执行顺序可想而知,越往后的越先执行,这样才能保证按照依赖顺序依次释放资源.


func TestFuncWithMultipleDefer(t *testing.T) {
// 「雪之梦技术驿站」: 猜测 defer 底层实现数据结构可能是栈,先进后出.
t.Log(" 「雪之梦技术驿站」: 猜测 defer 底层实现数据结构可能是栈,先进后出.")

// 3 2 1
defer t.Log(1)
defer t.Log(2)
t.Log(3)
}

相信你已经明白了多个 defer 语句的执行顺序,那就测试一下吧!


func TestFuncWithMultipleDeferOrder(t *testing.T) {
// 「雪之梦技术驿站」: defer 底层实现数据结构类似于栈结构,依次倒叙执行多个 defer 语句
t.Log(" 「雪之梦技术驿站」: defer 底层实现数据结构类似于栈结构,依次倒叙执行多个 defer 语句")

// 2 3 1
defer t.Log(1)
t.Log(2)
defer t.Log(3)
}

初步认识了 defer 延迟函数的使用情况后,我们再结合文档详细解读一下相关定义.



  • 英文原版文档



A "defer" statement invokes a function whose execution is deferred to the moment the surrounding function returns,either because the surrounding function executed a return statement,reached the end of its function body,or because the corresponding goroutine is panicking.




  • 中文翻译文档



"defer"语句调用一个函数,该函数的执行被推迟到周围函数返回的那一刻,这是因为周围函数执行了一个return语句,到达了函数体的末尾,或者是因为相应的协程正在惊慌.



具体来说,延迟函数的执行时机大概分为三种情况:


周围函数执行 return



because the surrounding function executed a return statement



return 后面的 t.Log(4) 语句自然是不会运行的,程序最终输出结果为 3 2 1 说明了 defer 语句会在周围函数执行 return 前依次逆序执行.


func funcWithMultipleDeferAndReturn() {
defer fmt.Println(1)
defer fmt.Println(2)
fmt.Println(3)
return
fmt.Println(4)
}

func TestFuncWithMultipleDeferAndReturn(t *testing.T) {
// 「雪之梦技术驿站」: defer 延迟函数会在包围函数正常return之前逆序执行.
t.Log(" 「雪之梦技术驿站」: defer 延迟函数会在包围函数正常return之前逆序执行.")

// 3 2 1
funcWithMultipleDeferAndReturn()
}

周围函数到达函数体



reached the end of its function body



周围函数的函数体运行到结尾前逆序执行多个 defer 语句,即先输出 3 后依次输出 2 1.
最终函数的输出结果是 3 2 1 ,也就说是没有 return 声明也能保证结束前执行完 defer 延迟函数.


func funcWithMultipleDeferAndEnd() {
defer fmt.Println(1)
defer fmt.Println(2)
fmt.Println(3)
}

func TestFuncWithMultipleDeferAndEnd(t *testing.T) {
// 「雪之梦技术驿站」: defer 延迟函数会在包围函数到达函数体结尾之前逆序执行.
t.Log(" 「雪之梦技术驿站」: defer 延迟函数会在包围函数到达函数体结尾之前逆序执行.")

// 3 2 1
funcWithMultipleDeferAndEnd()
}

当前协程正惊慌失措



because the corresponding goroutine is panicking



周围函数万一发生 panic 时也会先运行前面已经定义好的 defer 语句,而 panic 后续代码因为没有特殊处理,所以程序崩溃了也就无法运行.


函数的最终输出结果是 3 2 1 panic ,如此看来 defer 延迟函数还是非常尽忠职守的,虽然心里很慌但还是能保证老弱病残先行撤退!


func funcWithMultipleDeferAndPanic() {
defer fmt.Println(1)
defer fmt.Println(2)
fmt.Println(3)
panic("panic")
fmt.Println(4)
}

func TestFuncWithMultipleDeferAndPanic(t *testing.T) {
// 「雪之梦技术驿站」: defer 延迟函数会在包围函数panic惊慌失措之前逆序执行.
t.Log(" 「雪之梦技术驿站」: defer 延迟函数会在包围函数panic惊慌失措之前逆序执行.")

// 3 2 1
funcWithMultipleDeferAndPanic()
}

通过解读 defer 延迟函数的定义以及相关示例,相信已经讲清楚什么是 defer 延迟函数了吧?


简单地说,延迟函数就是一种未雨绸缪的规划机制,帮助开发者编程程序时及时做好收尾善后工作,提前做好预案以准备随时应对各种情况.



  • 当周围函数正常执行到到达函数体结尾时,如果发现存在延迟函数自然会逆序执行延迟函数.

  • 当周围函数正常执行遇到 return 语句准备返回给调用者时,存在延迟函数时也会执行,同样满足善后清理的需求.

  • 当周围函数异常运行不小心 panic 惊慌失措时,程序存在延迟函数也不会忘记执行,提前做好预案发挥了作用.


所以不论是正常运行还是异常运行,提前做好预案总是没错的,基本上可以保证万无一失,所以不妨考虑考虑 defer 延迟函数?


go-error-about-lovely.png
go-error-about-lovely.png

延迟函数应用场景


基本上成双成对的操作都可以使用延迟函数,尤其是申请的资源前后存在依赖关系时更应该使用 defer 关键字来简化处理逻辑.


下面举两个常见例子来说明延迟函数的应用场景.



  • Open/Close


文件操作一般会涉及到打开和开闭操作,尤其是文件之间拷贝操作更是有着严格的顺序,只需要按照申请资源的顺序紧跟着defer 就可以满足资源释放操作.


func readFileWithDefer(filename string) ([]byte, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
return ioutil.ReadAll(f)
}


  • Lock/Unlock


锁的申请和释放是保证同步的一种重要机制,需要申请多个锁资源时可能存在依赖关系,不妨尝试一下延迟函数!


var mu sync.Mutex
var m = make(map[string]int)
func lookupWithDefer(key string) int {
mu.Lock()
defer mu.Unlock()
return m[key]
}

总结以及下节预告


defer 延迟函数是保障关键逻辑正常运行的一种机制,如果存在多个延迟函数的话,一般会按照逆序的顺序运行,类似于栈结构.


延迟函数的运行时机一般有三种情况:



  • 周围函数遇到返回时


func funcWithMultipleDeferAndReturn() {
defer fmt.Println(1)
defer fmt.Println(2)
fmt.Println(3)
return
fmt.Println(4)
}


  • 周围函数函数体结尾处


func funcWithMultipleDeferAndEnd() {
defer fmt.Println(1)
defer fmt.Println(2)
fmt.Println(3)
}


  • 当前协程惊慌失措中


func funcWithMultipleDeferAndPanic() {
defer fmt.Println(1)
defer fmt.Println(2)
fmt.Println(3)
panic("panic")
fmt.Println(4)
}

本文主要介绍了什么是 defer 延迟函数,通过解读官方文档并配套相关代码认识了延迟函数,但是延迟函数中存在一些可能令人比较迷惑的地方.


go-error-about-question.png
go-error-about-question.png

读者不妨看一下下面的代码,将心里的猜想和实际运行结果比较一下,我们下次再接着分享,感谢你的阅读.


func deferFuncWithAnonymousReturnValue() int {
var retVal int
defer func() {
retVal++
}()
return 0
}

func deferFuncWithNamedReturnValue() (retVal int) {
defer func() {
retVal++
}()
return 0
}

延伸阅读参考文档



  • Defer_statements[2]

  • go 语言的 defer 语句[3]

  • Go defer 实现原理剖析[4]

  • go 语言 defer 你不知道的秘密![5]

  • Go 语言中 defer 的一些坑[6]

  • go defer (go 延迟函数)[7]



如果本文对你有所帮助,不用赞赏,点赞鼓励一下就是最大的认可,顺便也可以关注下微信公众号「 雪之梦技术驿站 」哟!



雪之梦技术驿站.png
雪之梦技术驿站.png

参考资料



[1]

Defer statements: https://golang.google.cn/ref/spec#Defer_statements



[2]

Defer_statements: https://golang.google.cn/ref/spec#Defer_statements



[3]

go语言的defer语句: https://www.jianshu.com/p/5b0b36f398a2



[4]

Go defer实现原理剖析: https://studygolang.com/articles/16067



[5]

go语言 defer 你不知道的秘密!: https://www.cnblogs.com/baizx/p/5024547.html



[6]

Go语言中defer的一些坑: https://www.jianshu.com/p/79c029c0bd58



[7]

go defer (go延迟函数): https://www.cnblogs.com/ysherlock/p/8150726.html