每日新闻

每日新闻

GoCN每日新闻资讯
有问必答

有问必答

Go相关的问题,技术相关的问题
文章分享

文章分享

技术文章分享,让知识传播给更多的人
招聘应聘

招聘应聘

为Gopher服务的招聘应聘平台

gin 注解路由,自动参数绑定工具

xie1xiao1jun 发表了文章 • 0 个评论 • 201 次浏览 • 2 天前 • 来自相关话题

ginprcgolang ...查看全部

ginprc

golang gin 参数自动绑定工具

  • 支持 rpc 自动映射
  • 支持对象注册
  • 支持注解路由
  • 基于 go-gin 的 json restful 风格的 golang 基础库
  • 自带请求参数过滤及绑定实现 binding:"required" validator
  • 代码注册简单且支持多种注册方式

api 接口说明

支持 3 种接口模式

  • func(gin.Context) //go-gin 原始接口
    func(
     api.Context) // 自定义的 context 类型
  • func(api.Context,req) // 自定义的 context 类型,带 request 请求参数
    func(
     api.Context,*req)
  • func(gin.Context,req) //go-gin context 类型,带 request 请求参数
    func(*gin.Context,req)

示例代码

初始化项目 (本项目以 ginweb 为名字)

``` go mod init ginweb ```

代码 (详细地址:https://github.com/xxjwxc/ginrpc/tree/master/sample/ginweb)

  package main

import (
"fmt"
"net/http"

_ "ginweb/routers" // debug模式需要添加[mod]/routers 注册注解路由

"github.com/gin-gonic/gin"
"github.com/xxjwxc/ginrpc"
"github.com/xxjwxc/ginrpc/api"
)

type ReqTest struct {
Access_token string `json:"access_token"`
UserName string `json:"user_name" binding:"required"` // 带校验方式
Password string `json:"password"`
}

// Hello ...
type Hello struct {
Index int
}

// Hello 带注解路由(参考beego形式)
// @router /block [post,get]
func (s *Hello) Hello(c *api.Context, req *ReqTest) {
fmt.Println(req)
fmt.Println(s.Index)
c.JSON(http.StatusOK, "ok")
}

// Hello2 不带注解路由(参数为2默认post)
func (s *Hello) Hello2(c *gin.Context, req ReqTest) {
fmt.Println(req)
fmt.Println(s.Index)
c.JSON(http.StatusOK, "ok")
}

//TestFun1 gin 默认的函数回调地址
func TestFun1(c *gin.Context) {
fmt.Println(c.Params)
c.String(200, "ok")
}

//TestFun2 自定义context的函数回调地址
func TestFun2(c *api.Context) {
fmt.Println(c.Params)
c.JSON(http.StatusOK, "ok")
}

//TestFun3 带自定义context跟已解析的req参数回调方式
func TestFun3(c *api.Context, req *ReqTest) {
fmt.Println(c.Params)
fmt.Println(req)
c.JSON(http.StatusOK, "ok")
}

//TestFun4 带自定义context跟已解析的req参数回调方式
func TestFun4(c *gin.Context, req ReqTest) {
fmt.Println(c.Params)
fmt.Println(req)

c.JSON(http.StatusOK, req)
}

func main() {
base := ginrpc.New(ginrpc.WithCtx(func(c *gin.Context) interface{} {
return api.NewCtx(c)
}), ginrpc.WithDebug(true), ginrpc.WithGroup("xxjwxc"))

router := gin.Default()
h := new(Hello)
h.Index = 123
base.Register(router, h) // 对象注册
router.POST("/test1", base.HandlerFunc(TestFun1)) // 函数注册
router.POST("/test2", base.HandlerFunc(TestFun2))
router.POST("/test3", base.HandlerFunc(TestFun3))
router.POST("/test4", base.HandlerFunc(TestFun4))
base.RegisterHandlerFunc(router, []string{"post", "get"}, "/test", TestFun1) // 多种请求方式注册

router.Run(":8080")
}
  • curl
    curl 'http://127.0.0.1:8080/test4' -H 'Content-Type: application/json' -d '{"access_token":"111", "user_name":"222", "password":"333"}'

注解路由

  • 1. 注解路由会自动创建 [mod]/routers/gen_router.go 文件 需要在调用时加:

    _ "[mod]/routers" // debug模式需要添加[mod]/routers 注册注解路由

    默认也会在项目根目录生成 [gen_router.data] 文件 (保留次文件,可以不用添加上面代码嵌入)

  • 2. 注解路由调用方式:

    base := ginrpc.New(ginrpc.WithCtx(func(c *gin.Context) interface{} {
    return api.NewCtx(c)
    }), ginrpc.WithDebug(true), ginrpc.WithGroup("xxjwxc"))
    base.Register(router, new(Hello)) // 对象注册
    router.Run(":8080")

    详细请看 demo ginweb

  • 3. 执行 curl,可以自动参数绑定。直接看结果

    curl 'http://127.0.0.1:8080/xxjwxc/block' -H 'Content-Type: application/json' -d '{"access_token":"111", "user_name":"222", "password":"333"}'
    curl 'http://127.0.0.1:8080/xxjwxc/hello.hello2' -H 'Content-Type: application/json' -d '{"access_token":"111", "user_name":"222", "password":"333"}'
  • 4 参数说明
    ginrpc.WithCtx : 设置自定义 context
    ginrpc.WithDebug (true) : 设置 debug 模式
    ginrpc.WithGroup ("xxjwxc") : 添加路由前缀 (也可以使用 gin.Group 分组)
    ginrpc.WithBigCamel (true) : 设置大驼峰标准 (false 为 web 模式,_, 小写)

    更多

下一步

1.导出api文档
2.导出postman测试配置

代码地址: ginprc 如果喜欢请给星支持

传送门

如果你喜欢,请'star'

分布式图数据库 Nebula Graph 中的集群快照实践

NebulaGraph 发表了文章 • 0 个评论 • 188 次浏览 • 4 天前 • 来自相关话题


1 概述


1.1 需求背景


图数据库 Nebula Graph 在生产环境中将拥有庞大的数据量和高频率的业务处理,在实际的运行中将不可避免的发生人为的、硬件或业务处理错误的问题,某些严重错误将导致集群无法正常运行或集群中的数据失效。当集群处于无法启动或数据失效的状态时,重新搭建集群并重新倒入数据都将是一个繁琐并耗时的工程。针对此问题,Nebula Graph 提供了集群 snapshot 的创建功能。


Snapshot 功能需要预先提供集群在某个时间点 snapshot 的创建功能,以备发生灾难性问题时用历史 snapshot 便捷地将集群恢复到一个可用状态。


1.2 术语


本文主要会用到以下术语:



  • **StorageEngine:**Nebula Graph 的最小物理存储单元,目前支持 RocksDB 和 HBase,在本文中只针对 RocksDB。

  • Partition:Nebula Graph 的最小逻辑存储单元,一个 StorageEngine 可包含多个 Partition。Partition 分为 leader 和 follower 的角色,Raftex 保证了 leader 和 follower 之间的数据一致性。

  • GraphSpace:每个 GraphSpace 是一个独立的业务 Graph  单元,每个 GraphSpace 有其独立的 tag 和 edge 集合。一个 Nebula Graph 集群中可包含多个 GraphShpace。

  • checkpoint:针对 StorageEngine 的一个时间点上的快照,checkpoint 可以作为全量备份的一个 backup 使用。checkpoint files是 sst files 的一个硬连接。

  • snapshot:本文中的 snapshot 是指 Nebula Graph 集群的某个时间点的快照,即集群中所有 StorageEngine 的 checkpoint 的集合。通过 snapshot 可以将集群恢复到某个 snapshot 创建时的状态。

  • wal:Write-Ahead Logging ,用 raftex 保证 leader 和 follower 的一致性。


2 系统构架


2.1 系统整体架构


image


2.2 存储系统结构关系


image


2.3 存储系统物理文件结构


[bright2star@hp-server storage]$ tree
.
└── nebula
└── 1
├── checkpoints
│ ├── SNAPSHOT_2019_12_04_10_54_42
│ │ ├── data
│ │ │ ├── 000006.sst
│ │ │ ├── 000008.sst
│ │ │ ├── CURRENT
│ │ │ ├── MANIFEST-000007
│ │ │ └── OPTIONS-000005
│ │ └── wal
│ │ ├── 1
│ │ │ └── 0000000000000000233.wal
│ │ ├── 2
│ │ │ └── 0000000000000000233.wal
│ │ ├── 3
│ │ │ └── 0000000000000000233.wal
│ │ ├── 4
│ │ │ └── 0000000000000000233.wal
│ │ ├── 5
│ │ │ └── 0000000000000000233.wal
│ │ ├── 6
│ │ │ └── 0000000000000000233.wal
│ │ ├── 7
│ │ │ └── 0000000000000000233.wal
│ │ ├── 8
│ │ │ └── 0000000000000000233.wal
│ │ └── 9
│ │ └── 0000000000000000233.wal
│ └── SNAPSHOT_2019_12_04_10_54_44
│ ├── data
│ │ ├── 000006.sst
│ │ ├── 000008.sst
│ │ ├── 000009.sst
│ │ ├── CURRENT
│ │ ├── MANIFEST-000007
│ │ └── OPTIONS-000005
│ └── wal
│ ├── 1
│ │ └── 0000000000000000236.wal
│ ├── 2
│ │ └── 0000000000000000236.wal
│ ├── 3
│ │ └── 0000000000000000236.wal
│ ├── 4
│ │ └── 0000000000000000236.wal
│ ├── 5
│ │ └── 0000000000000000236.wal
│ ├── 6
│ │ └── 0000000000000000236.wal
│ ├── 7
│ │ └── 0000000000000000236.wal
│ ├── 8
│ │ └── 0000000000000000236.wal
│ └── 9
│ └── 0000000000000000236.wal
├── data
复制代码

3 处理逻辑分析


3.1 逻辑分析


image


Create snapshot  由 client api  或 console  触发, graph server  对 create snapshot  的 AST 进行解析,然后通过 meta client  将创建请求发送到 meta server 。 meta server  接到请求后,首先会获取所有的 active host ,并创建 adminClient  所需的 request 。通过 adminClient  将创建请求发送到每个 StorageEngine ,StorageEngine 收到 create 请求后,会遍历指定 space 的全部 StorageEngine,并创建 checkpoint ,随后对 StorageEngine 中的全部 partition  的 wal 做 hardlink。在创建 checkpoint 和 wal hardlink 时,因为已经提前向所有 leader partition 发送了 write blocking 请求,所以此时数据库是只读状态的。


因为 snapshot 的名称是由系统的 timestamp 自动生成,所以不必担心 snapshot 的重名问题。如果创建了不必要的 snapshot,可以通过 drop snapshot 命令删除已创建的 snapshot。


3.2 Create Snapshot


image


3.3 Create Checkpoint


image


4 关键代码实现


4.1 Create Snapshot


folly::Future<Status> AdminClient::createSnapshot(GraphSpaceID spaceId, const std::string& name) {
// 获取所有storage engine的host
auto allHosts = ActiveHostsMan::getActiveHosts(kv_);
storage::cpp2::CreateCPRequest req;

// 指定spaceId,目前是对所有space做checkpoint,list spaces 工作已在调用函数中执行。
req.set_space_id(spaceId);

// 指定 snapshot name,已有meta server根据时间戳产生。
// 例如:SNAPSHOT_2019_12_04_10_54_44
req.set_name(name);
folly::Promise<Status> pro;
auto f = pro.getFuture();

// 通过getResponse接口发送请求到所有的storage engine.
getResponse(allHosts, 0, std::move(req), [] (auto client, auto request) {
return client->future_createCheckpoint(request);
}, 0, std::move(pro), 1 /*The snapshot operation only needs to be retried twice*/);
return f;
}
复制代码

4.2 Create Checkpoint


ResultCode NebulaStore::createCheckpoint(GraphSpaceID spaceId, const std::string& name) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
return error(spaceRet);
}
auto space = nebula::value(spaceRet);

// 遍历属于本space中的所有StorageEngine
for (auto& engine : space->engines_) {

// 首先对StorageEngine做checkpoint
auto code = engine->createCheckpoint(name);
if (code != ResultCode::SUCCEEDED) {
return code;
}

// 然后对本StorageEngine中的所有partition的last wal做hardlink
auto parts = engine->allParts();
for (auto& part : parts) {
auto ret = this->part(spaceId, part);
if (!ok(ret)) {
LOG(ERROR) << "Part not found. space : " << spaceId << " Part : " << part;
return error(ret);
}
auto walPath = folly::stringPrintf("%s/checkpoints/%s/wal/%d",
engine->getDataRoot(), name.c_str(), part);
auto p = nebula::value(ret);
if (!p->linkCurrentWAL(walPath.data())) {
return ResultCode::ERR_CHECKPOINT_ERROR;
}
}
}
return ResultCode::SUCCEEDED;
}
复制代码

5 用户使用帮助


5.1 CREATE SNAPSHOT


CREATE SNAPSHOT  即对整个集群创建当前时间点的快照,snapshot 名称由 meta server 的 timestamp 组成



在创建过程中可能会创建失败,当前版本不支持创建失败的垃圾回收的自动功能,后续将计划在 metaServer 中开发 cluster checker 的功能,将通过异步线程检查集群状态,并自动回收 snapshot 创建失败的垃圾文件。



当前版本如果 snapshot 创建失败,必须通过 DROP SNAPSHOT 命令清除无效的 snapshot。


当前版本不支持对指定的 space 做 snapshot,当执行 CREATE SNAPSHOT 后,将对集群中的所有 space 创建快照。
CREATE SNAPSHOT 语法:


CREATE SNAPSHOT
复制代码

以下为笔者创建 3 个 snapshot 的例子:


(user@127.0.0.1) [default_space]> create snapshot;
Execution succeeded (Time spent: 28211/28838 us)

(user@127.0.0.1) [default_space]> create snapshot;
Execution succeeded (Time spent: 22892/23923 us)

(user@127.0.0.1) [default_space]> create snapshot;
Execution succeeded (Time spent: 18575/19168 us)
复制代码

我们用 5.3 提及的 SHOW SNAPSHOTS 命令看下现在有的快照


(user@127.0.0.1) [default_space]> show snapshots;
===========================================================
| Name | Status | Hosts |
===========================================================
| SNAPSHOT_2019_12_04_10_54_36 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
| SNAPSHOT_2019_12_04_10_54_42 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
| SNAPSHOT_2019_12_04_10_54_44 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
Got 3 rows (Time spent: 907/1495 us)
复制代码

从上 SNAPSHOT_2019_12_04_10_54_36  可见 snapshot 名同 timestamp 有关。


5.2 DROP SNAPSHOT


DROP SNAPSHOT 即删除指定名称的 snapshot,可以通过 SHOW SNAPSHOTS 命令获取 snapshot 的名称,DROP SNAPSHOT 既可以删除有效的 snapshot,也可以删除创建失败的 snapshot。


语法:


DROP SNAPSHOT name
复制代码

笔者删除了 5.1 成功创建的 snapshot SNAPSHOT_2019_12_04_10_54_36 ,并用SHOW SNAPSHOTS 命令查看现有的 snapshot。


(user@127.0.0.1) [default_space]> drop snapshot SNAPSHOT_2019_12_04_10_54_36;
Execution succeeded (Time spent: 6188/7348 us)

(user@127.0.0.1) [default_space]> show snapshots;
===========================================================
| Name | Status | Hosts |
===========================================================
| SNAPSHOT_2019_12_04_10_54_42 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
| SNAPSHOT_2019_12_04_10_54_44 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
Got 2 rows (Time spent: 1097/1721 us)
复制代码

5.3 SHOW SNAPSHOTS


SHOW SNAPSHOTS 可查看集群中所有的 snapshot,可以通过 SHOW SNAPSHOTS 命令查看其状态(VALID 或 INVALID)、名称、和创建 snapshot 时所有 storage Server 的 ip 地址。
语法:


SHOW SNAPSHOTS
复制代码

以下为一个小示例:


(user@127.0.0.1) [default_space]> show snapshots;
===========================================================
| Name | Status | Hosts |
===========================================================
| SNAPSHOT_2019_12_04_10_54_36 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
| SNAPSHOT_2019_12_04_10_54_42 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
| SNAPSHOT_2019_12_04_10_54_44 | VALID | 127.0.0.1:77833 |
-----------------------------------------------------------
Got 3 rows (Time spent: 907/1495 us)
复制代码

6 注意事项



  • 当系统结构发生变化后,最好立刻 create snapshot,例如 add host、drop host、create space、drop space、balance 等。

  • 当前版本暂未提供用户指定 snapshot 路径的功能,snapshot 将默认创建在 data_path/nebula 目录下

  • 当前版本暂未提供 snapshot 的恢复功能,需要用户根据实际的生产环境编写 shell 脚本实现。实现逻辑也比较简单,拷贝各 engineServer 的 snapshot 到指定的文件夹下,并将此文件夹设置为 data_path,启动集群即可。


7 附录


最后,附上 Nebula Graph GitHub 地址:github.com/vesoft-inc/… 如果你在使用 Nebula Graph 过程中遇到任何问题,欢迎 GitHub 联系我们或者加入微信交流群,请联系微信号:NebulaGraphbot


关注公众号

【Golang+mysql】记一次mysql数据库迁移(一)

stayfoo 发表了文章 • 0 个评论 • 253 次浏览 • 5 天前 • 来自相关话题

# 记一次mysql数据库迁移(一)

原文: https://github.com/stayfoo/stayfoo-hub/blob/master/docs/mysql/blog/%E8%AE%B0%E4%B8%80%E6%AC%A1mysql%E6%95%B0%E6%8D%AE%E5%BA%93%E8%BF%81%E7%A7%BB%EF%BC%88%E4%B8%80%EF%BC%89.md


## 一、准备

- 目标:

腾讯云 `CVM` 自建 `mysql` 数据迁移到腾讯云数据库 `mysql` 。

- 腾讯云 `CVM` 自建 `mysql` 现状:
- 1、mysql 版本:`Ver 8.0.15 for Linux on x86_64 (MySQL Community Server - GPL)`
- 2、mysql 文件目录数据大小: 2.4 G (`/var/lib/mysql`)
> - 查看 `msyql` 数据目录位置:`mysql> show variables like '%datadir%';`
> - 查看 `/var/lib/mysql` 目录的大小: `du -sh`

```
mysql> show variables like '%datadir%';
+---------------+-----------------+
| Variable_name | Value |
+---------------+-----------------+
| datadir | /var/lib/mysql/ |
+---------------+-----------------+
1 row in set (0.04 sec)
```

- 3、只有一个 `root` 账户,只允许 `localhost` 访问
- 4、数据库字符集:`charset=utf8`

- 起因:

`api` 服务、报表计划任务、以及其他计划任务、自建 `mysql` 都在同一台腾讯 `CVM` 上面。计划任务执行的时候,`mysql` 占用 `CPU` 过高,会导致 `api` 服务无法正常使用。所以考虑腾讯云数据库 `mysql`,使用主从架构(先尝试使用一主一从),`master mysql` 主要服务于 `api` 服务,`slave mysql` 主要用于计划任务。

- 腾讯云数据库 `mysql` :
- 1、腾讯云数据库 `mysql` 与腾讯云服务器 `CVM`,同一账号下,同一个地域支持使用内网 `ip` 访问。(比如 `CVM` 是重庆,云数据库必须也是重庆,并且是在同一账号下)

- 2、购买腾讯云 `mysql 5.7`(支持的最大版本,没有`8.0`):先购买一台高可用版 `mysql`,作为 `master mysql`,之后可以扩展多台 `slave mysql`

- 3、迁移工具:腾讯云`DTS`,`mysqldump`(备用)


## 二、迁移

- 1、使用 DTS 迁移

购买完云 `mysql`,初始化,开始使用 `DTS` 进行迁移。 自建 `mysql` 和 云 `mysql` 是属于同一个账号下,同一区域下,可以使用内网直接访问。

![image](https://note.youdao.com/yws/api/personal/file/WEBdd29c8faff09c4b8a0edf6ace1913e40?method=download&shareKey=36844f0af6f14b7822a5909e962c7f8c)


- 2、查看 `CVM` 自建 `mysql` 用户

```
mysql> select host,user,plugin from user;
```

发现只有一个 `root` 账号,只能 `localhost` 访问,需要创建新账号,指定购买的云 `mysql` 的内网 `ip` 授权访问的新账号。在 `CVM` 自建 `mysql` 创建账号:

```
mysql> GRANT ALL PRIVILEGES ON *.* TO "stay"@"1xx.xx.0.0" IDENTIFIED BY "111";
```

创建账号,并授权,发现报错。提示不能用 `grant` 创建用户。 原来 `mysql8.0` 以前的版本可以使用 `grant` 在授权的时候隐式的创建用户。`mysql8.0` 以后已经不支持。`mysql8.0` 必须先创建用户,然后再授权,命令如下:

```
mysql> CREATE USER 'stay'@'172.30.0.0' IDENTIFIED BY '密码';
Query OK, 0 rows affected (0.48 sec)

mysql> grant all privileges on *.* TO 'stay'@'172.30.0.0';
Query OK, 0 rows affected (0.48 sec)
```

> 注意:密码不能和 root 账户相同。

> 参考 mysql8.0 文档:https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html


- 3、在腾讯云 `mysql` 操作 `DTS`

输入源库设置,接入类型选择云主机自建,选择自建 `mysql` 所在的主机实例`ID`,所属区域,自建 `mysql` 的端口号,输入新建的账号,密码。

配置完成之后,测试连通性,`Telnet` 通过,`MySQL Connect` 失败。提示:`MySQL Connect无法连接源实例。请检查实例账号、密码,并确认源实例是否取消对[172.30.0.0/20]的访问限制` 如图:

![image](https://note.youdao.com/yws/api/personal/file/WEB35ab8e244772d87b39aa852e9649e39b?method=download&shareKey=68a0f3bca7898ca4ee6e17a837b45241)


创建账号的时候,对`[172.30.0.0/20]`的访问是授权的。

尝试设置 `CVM` 服务器安全组,把 `172.30.0.0`(即云`mysql`内网 `ip`)设置为信任访问 `3306` 端口。测试连接依然不行。

尝试新创建了一个对所有`ip`都能访问授权的账号,并测试在本地电脑远程连接自建 `mysql` 是没有问题的。然后把这个账号密码配置到 `DTS` 源库上,测试连通性,依然不行。

给腾讯云 mysql 提了工单,让腾讯云工程师协助解决。工单:

![image](https://note.youdao.com/yws/api/personal/file/WEBe3bc59fddbf77defc52f1e2ea4101c96?method=download&shareKey=73600f556d6c42f24a061ac38d761f5c)


> 源库连接失败原因:
> - 数据库账号或密码不正确
> - 源数据库对来源IP进行了限制
> - 源数据库服务器设置了防火墙
> - 网络互通问题
> 可以参考:https://help.aliyun.com/document_detail/52099.html?spm=a2c4g.11186623.2.27.2d2e37admhRAPG


知道是因为版本问题导致的,自建 `mysql` 是 `8.0.15`,而腾讯云 `mysql` 最高支持 `5.7`,`DTS` 不支持从 `8.0.15` 迁移到 `5.7`,所以连通性测试会一直失败。


- 4、使用 `mysqldump` 手动迁移数据

> mysqldump 工作原理:查出需要备份的表结构 -> 生成一个 create sql语句(sql后缀名的文本文件);表中所有记录 -> 转成一条 insert 语句。

先从源数据库导出数据库表结构信息:
```
# 导出数据库表结构(导出的是创建表结构的sql语句)
mysqldump --opt -d[库名] -u[数据库用户名] -p[密码] > /导出的文件名字存储路径.sql
```

从源数据库导出数据信息:
```
mysqldump -t[数据库名] -u[数据库用户名] -p[数据库密码] > xxx.sql
```

`CVM` 主机上使用内网 `ip` 链接云 `mysql`,先创建同名数据库:

```
mysql> create DATABASE sf_factory charset=utf8;
```

然后,依次导入数据库表结构信息,数据库数据:

```
mysql> source [导出的数据库表结构.sql];
mysql> source [导出的数据库数据.sql];
```

> 参考 msyql 8.0官方文档:https://dev.mysql.com/doc/refman/8.0/en/mysqldump-copying-to-other-server.html


- 5、验证数据完整性

- 6、删除自建 `mysql` 多余账号信息

```
mysql> drop user stay@"172.30.0.0/20";
```

方式一:`drop` 不仅会将 `user` 表中的数据删除,还会删除对应权限表内容。

```
#drop user xxxx; 默认删除的是 'xxxx'@'%'
drop user xxxx;

drop user 'xxxx'@'localhost';
drop user 'xxxx'@'172.xx.x.x';
```

方式二:`delete` 只会删除 `user` 中的内容。所以使用 `delete` 删除用户后需要执行 `FLUSH PRIVILEGES;`刷新权限,否则下次使用 `create` 创建用户会报错。





分布式图数据库 Nebula RC2 发布:增强了 CSV Importer 功能

NebulaGraph 发表了文章 • 0 个评论 • 226 次浏览 • 5 天前 • 来自相关话题

Nebula Graph 是开源的分布式图数据库,可应用于知识图谱、社交推荐、风控、IoT 等场景。 ...查看全部

Nebula Graph 是开源的分布式图数据库,可应用于知识图谱、社交推荐、风控、IoT 等场景。

1575512574148518.png

本次 RC2 主要新增 GO FROM ... REVERSELY 和 GROUP BY 等语句,Storage Engine 也更灵活,用户可以通过 Console 或 Http 获取配置信息,触发 compaction;客户端目前支持 Python,Java,Go;在工具方面,增强了 CSV Importer 功能

Query Engine

  • 支持 GROUP BY 语句用于聚合函数,根据一个或多个属性对结果集进行分组。 #749

  • UPDATE CONFIGS 增加对表达式的支持 #1273

  • 支持通过 Console 更改日志级别 #1273

  • 新增 “IF...RETURN...”,条件成立时返回对应结果,提供一定的 if else 逻辑能力 #1233 、#1246

  • String  数据类型支持无限长度 #1103

  • 插入点边时,和 SQL一样,允许按用户输入的属性名顺序来插入(而不是 Schema 的属性名顺序)#1219

  • Tag/Edgetype 属性支持设置默认值 #860

  • 新增 GO FROM ... REVERSELY ,查询反向关系 #1349

Storage

  • 新增 SHOW PARTS 获取当前 SPACE 的 partition 详情 #1086

  • 新增 BALANCE STOP 暂停 load balance #1238

  • 支持通过手动触发 RocksDB compaction 和 flush #677 #1240

  • RocksDB 默认 BlockBasedTable 大小从 4 MB 改成 1024 MB #1248

  • gflags 中新增 max_edge_returned_per_vertex 选项,限制超级顶点返回边数量 #1221

  • Storage 增加对点的缓存 #1294 #1268

  • 升级依赖库 folly、fbthrift #1161

Tools

Client

Changed/Removed

  • 命令 ADD/REMOVE HOSTS 不再支持,hosts 会被自动发现,用户不再需要手动添加

RC3 Roadmap

  • Query Engine

    • 子图

  • Storage

    • Snapshot

    • 索引

    • Remove / Scan Interface

    • Jepsen

  • Tools

    • DumpTool

  • Test Framework

    • 回归测试

    • 混沌测试

  • UI

    • 查询

    • CSV 数据导入

    • Hash 工具

    • Graph Exploration

最后是 Nebula 的 GitHub 地址,欢迎大家试用,有什么问题可以向我们提 issue。GitHub 地址:https://github.com/vesoft-inc/nebula

[分布式限流] 滑动窗口算法的 Golang 实现

RussellLuo 发表了文章 • 0 个评论 • 692 次浏览 • 2019-11-27 21:57 • 来自相关话题

https://github.com/RussellLuo/slidingwindow ...查看全部

gRPC-Go系列文章

回复

阿伟 发起了问题 • 1 人关注 • 0 个回复 • 240 次浏览 • 2019-11-27 09:34 • 来自相关话题

比原链CTO James | Go语言成为区块链主流开发语言的四点理由

Bytom 发表了文章 • 0 个评论 • 592 次浏览 • 2019-11-27 07:31 • 来自相关话题

11月24日,比原链CTO James参加了Go中国举办的Gopher Meetup杭州站活动,与来自阿里、 ...查看全部


11月24日,比原链CTO James参加了Go中国举办的Gopher Meetup杭州站活动,与来自阿里、网易的技术专家带来Kubernetes、区块链、日志采集、云原生等话题的分享。James向大家介绍了Go语言特性在区块链中的应用还分析了Go语言成为区块链主流开发语言的原因。

比原链的系统架构




在区块链系统中内核层是最核心的,他承接了区块验证、交易验证、节点维护、打包挖矿等重多职责。通信层掌管了区块链系统的网络服务,区块链的网络更像P2P的网络形式,他呈网状扩散,负责区块同步、交易同步、节点发现等重要的功能。钱包层是直接和用户发生交互的一层,他的职责是资产管理、私钥管理,并与内核层通信验证区块交易。



以Bytom为例,他的内核层分为了五个的模块。我们举例描述几个主要的模块。

孤儿块管理:孤儿块就是由矿工挖出但未成为主链区块的区块(在相同高度产生2个甚至更多的合法区块,一个区块成为主链,剩下的则称为孤儿块),孤儿块管理就是将未成为主链区块的孤儿块存储起来。举个例子,我现在挖到的区块是100,那么下一个区块理论上是101,但是在网络层发生延时的时候,可能会发生先出102再出101的情况,那么孤儿块管理会将这些父块未到的子块先缓存起来。

共识层:确认一个块是否合法。分为区块头验证和交易验证。区块头验证需要验证它的父块和时间戳,同是需要算力来保证记账权利。交易验证比原特别的设计了一层BC层,这层在交易验证时会获得更好的性能,交易验证还和智能合约相关,交易被验证时参数会参入虚拟机验证该交易是否合法。

区块树管理:又称为Block Index,作用是记录全网所有的块,保存了全网所有块的一张镜像图。因为有孤儿块,所有它并不是链式结构的,会有分叉的情况,所以称为区块树。举个例子,区块树管理好比有一个分布式系统,但无法保证每个节点的状态一致,可能会出现同一个高度产生同时参生区块的情况。区块树管理就是具备了在节点状态不一致的情况下让系统回滚到正确的区块这个功能。

数据存储:将区块数据做持久化存储。包含两种数据,第一种是区块数据,会在网络上进行广播的原生区块信息;第二种是UTXO数据,存储UTXO数据是为了更快的验证一笔UTXO是否可以花费,而不需要去遍历所有区块信息。

交易池:维护了所有全网发出的但是还未被确认的交易。跟它关联最大的是挖矿模块,挖矿模块每次要产生一个新区块的时候,它会从交易池拿一些交易打包成块,然后用Tensority共识算法进行工作量验算。举个例子,在节点打包交易之前,交易处在一个未确认的状态之下,交易池会将这些未确认的交易保存起来,分配给后面的矿工用于打包。

WHY GOLANG?




第一点,区块链是多模块异步协同工作的,分成了P2P层、钱包层、内核层。其中内核层、情报层里面都有不同的很多子模块,在负责不同的事情。Go语言做得很出色非常适合做这方面的开发。

第二点,区块链项目有核心团队但是主要还是依赖社区参与,Go语言编译上相对于其他语言稍微严格一点,在制定规范后可以使不同开发者代码风格相近,有利于协同工作的展开。

第三点,Go语言社区完善,有很多非常好的开源库支持,使用起来非常方便用户体验良好。

第四点,主流语言的头部效应,在区块链的项目里面超过85%的项目都是基于Go语言开发的,大部分区块链工程师都擅长Go语言,所以当新的项目开始的时候,会首选Go语言。

对区块链感兴趣的程序员可以投递简历至:james@bytom.io

使用图数据库 Nebula Graph 数据导入快速体验知识图谱 OwnThink

NebulaGraph 发表了文章 • 0 个评论 • 555 次浏览 • 2019-11-26 11:49 • 来自相关话题

前言本文由 Nebula Graph 实习生@王杰贡献。最近 @Yener 开源了史上最大规模的中文知识图谱—— ...查看全部

前言

本文由 Nebula Graph 实习生@王杰贡献。

最近 @Yener 开源了史上最大规模的中文知识图谱——OwnThink(链接:https://github.com/ownthink/KnowledgeGraphData ),数据量为 1.4 亿条。

本文介绍如何将这份数据快速导入图数据库 Nebula Graph,全过程大约需要 30 分钟。

中文知识图谱 OwnThink 简介

思知(OwnThink)

知识图谱是由 Google 在 2012 年提出来的一个概念。主要是用来描述真实世界中存在的各种实体和概念,以及他们之间的关系。在搜索引擎、问答机器人、知识抽取等多个领域有着诸多应用。

最近 Yener 开源了史上最大规模的中文知识图谱—— OwnThink(链接:https://github.com/ownthink/KnowledgeGraphData),数据量为 1.4 亿条。数据以 (实体, 属性, 值) 和 (实体, 关系, 实体) 混合的三元组形式存储,数据格式为 csv。

可以点击这里下载:https://nebula-graph.oss-accelerate.aliyuncs.com/ownthink/kg_v2.tar.gz

查看原始文件

由于 ownthink_v2.csv 数据过多,摘录部分数据为例:

红色食品,描述,红色食品是指食品为红色、橙红色或棕红色的食品。
红色食品,是否含防腐剂,否
红色食品,主要食用功效,预防感冒,缓解疲劳
红色食品,用途,增强表皮细胞再生和防止皮肤衰老
大龙湫,描述,雁荡山景区分散,东起羊角洞,西至锯板岭;南起筋竹溪,北至六坪山。
大龙湫,中文名称,大龙湫
大龙湫,外文名称,big dragon autrum
大龙湫,门票价格,50元
大龙湫,著名景点,芙蓉峰
姚明[中国篮球协会主席、中职联公司董事长],妻子,叶莉

这里的 (红色食品,是否含防腐剂,否) 就是典型的 (实体, 属性, 值) 形式的三元组数据; 而 (姚明[中国篮球协会主席、中职联公司董事长],妻子,叶莉) 是典型的 (实体, 关系, 实体) 形式的三元组数据。

Step 1. 数据建模与清洗准备

建模

Nebula Graph 是一个开源的分布式图数据库(链接:https://github.com/vesoft-inc/nebula),相比 Neo4j 来说,它的主要特点是完全的分布式,因此图数据库 Nebula Graph 适合处理数据量超过单机的场景

图数据库通常支持的数据模型为有向属性图(directed property graph)。图中的每个顶点(vertex)可以用标签(tag)来表示类型(Neo4j 叫做 Label),顶点和顶点之间的关系用边(edge)连接起来。每种 tag 和 edge 还可以带有属性。——然而,这些功能对于知识图谱的三元组数据没什么意义:

分析上图的三元组数据,发现无论是 (实体, 属性, 值) 形式的三元组数据,还是 (实体, 关系, 实体) 形式的三元组数据,每条三元组数据均可以建模成两个点一条边的形式。前者三元组中的“实体”和“”建模为两个点(起点、终点),“属性”建模为一条边,后者三元组中的两个“实体”也建模为两个点(起点、终点),“关系”建模为一条边.

而且,所有的点都是相同类型(取名叫entity ),只需要一个属性(叫 name ),所有的边也都是同一类型(取名叫 relation ),边上也只有一个属性(叫 name )。

比如 (大龙湫,著名景点,芙蓉峰) 可以表示成下图这个样子:

数据清洗和预处理

按照前一节的分析,原始的每条三元组数据,还需要清洗转换为两个点和一条边才能变成属性图的模型。

下载清洗工具

本文测试的时候,使用的操作系统是 CentOS 7.5,工具由 Golang 语言编写而成。

你可以在这里 (链接:https://github.com/jievince/rdf-converter) 下载这个简单的清洗工具源代码并编译使用。

该工具会把转换后的顶点的数据写入到 vertex.csv 文件、边数据写入到 edge.csv 文件。

说明:在测试过程中,发现有大量的重复点数据,所以工具里面也做了去重。完全去重后的点的数据大概是 4600 万条,完全去重后的边的数据大概是 1 亿 4000 万条。

清洗完的 vertex.csv 文件长这样:

-2469395383949115281,过度包装
-5567206714840433083,Over Package
3836323934884101628,有的商品故意增加包装层数
1185893106173039861,很多采用实木、金属制品
3455734391170888430,非科学
9183164258636124946,教育
5258679239570815125,成熟市场
-8062106589304861485,"成熟市场是指低增长率,高占有率的市场。"

说明:每一行是一个顶点,第一列整型 -2469395383949115281 是顶点的 ID(叫做 VID),它是由第二列文字通过 hash 计算出来的,例如 -2469395383949115281 就是由 std::hash("过度包装") 计算出来的值。

清洗完的 edge.csv 文件:

3413383836870836248,-948987595135324087,含义
3413383836870836248,8037179844375033188,定义
3413383836870836248,-2559124418148243756,标签
3413383836870836248,8108596883039039864,标签
2587975790775251569,-4666568475926279810,描述
2587975790775251569,2587975790775251569,中文名称
2587975790775251569,3771551033890875715,外文名称
2587975790775251569,2900555761857775043,地理位置
2587975790775251569,-1913521037799946160,占地面积
2587975790775251569,-1374607753051283066,开放时间

说明:第一列是起点的 VID,第二列是终点的 VID,第三列是这条边的"属性"或者"描述"。

在本机完全去重的清洗程序运行时间大约是 6 分钟。

Step 2. Nebula Graph 启动准备

下载和安装

登陆 GitHub 后,在这里 (链接:https://github.com/vesoft-inc/nebula/actions) 找到 Nebula 的安装包。

找到你所用系统对应的下载链接:

笔者系统是 CentOS 7.5,下载 CentOS 7.5 最新的压缩包,解压后能找到 rpm 安装包 nebula-5ace754.el7-5.x86_64.rpm,注意 5ace754 是 git commit 号,使用时可能会有所不同。下载好后解压,输入下面命令进行安装,记得替换成新的 git commit:

$ rpm -ivh nebula-5ace754.el7-5.x86_64.rpm

启动 Nebula Graph 服务

在 命令行 CLI 输入下面命令启动服务

$ /usr/local/nebula/scripts/nebula.service start all

命令执行结果如下:

可以执行以下命令检查服务是否成功启动

$ /usr/local/nebula/scripts/nebula.service status all

命令执行结果如下:

连接 Nebula Graph 服务

输入下面命令连接 Nebula Graph:

$ /usr/local/nebula/bin/nebula -u user -p password

命令执行结果如下:

准备 schema 等元数据

Nebula Graph 的使用风格有点接近 MySQL,需要先准备各种元信息。

新建图空间 space

create space 的概念接近 MySQL 里面 create database。在 nebula console 里面输入下面这个命令。

nebula> CREATE SPACE test;

进入 test space

nebula> USE test;

创建点类型(entity)

nebula> CREATE TAG entity(name string);

创建边类型 (relation)

nebula> CREATE EDGE relation(name string);

最后简单确认下元数据是不是正确。

查看 entity 标签的属性

nebula> DESCRIBE TAG entity;

结果如下:

查看 relation 边类型的属性

nebula> DESCRIBE EDGE relation;

结果如下:

Step 3. 使用 nebula-importer 导入数据

登陆 GitHub 进入 https://github.com/vesoft-inc/nebula-importer ,nebula-importer 这个工具也是 Golang 语言写的,在这里下载并编译源代码。

另外,准备一个 YAML 配置文件,告诉这个 importer 工具去哪里找 csv 文件。(可直接复制下面这段)

version: v1rc1
description: example
clientSettings:
concurrency: 10 # number of graph clients
channelBufferSize: 128
space: test
connection:
user: user
password: password
address: 127.0.0.1:3699
logPath: ./err/test.log
files:
- path: ./vertex.csv
failDataPath: ./err/vertex.csv
batchSize: 100
type: csv
csv:
withHeader: false
withLabel: false
schema:
type: vertex
vertex:
tags:
- name: entity
props:
- name: name
type: string
- path: ./edge.csv
failDataPath: ./err/edge.csv
batchSize: 100
type: csv
csv:
withHeader: false
withLabel: false
schema:
type: edge
edge:
name: relation
withRanking: false
props:
- name: name
type: string

说明:测试时候发现 csv 数据文件中有大量转义字符 (\) 和换行字符 (\r),nebula-importer 也做了处理。

最后:开始导入数据

go run importer.go --config ./config.yaml

执行过程如下:

可以看到, 本次导入 QPS 大约在 40 w/s。全部导入总耗时大约 15 min。

Step 4. 随便读点什么试试

导入完毕后,我们可以使用 Nebula Graph 服务做一些简单的查询。回到 Nebula Graph 的命令行 CLI :

$ /usr/local/nebula/bin/nebula -u user -p password

进入刚才导入的三元组数据的 test 空间:

nebula> USE test;

现在,我们可以做一些简单查询

  • 例 1:与姚明有直接关联的边的类型和点的属性
(user@127.0.0.1) [test]> GO FROM hash("姚明[中国篮球协会主席、中职联公司董事长]") OVER relation  YIELD relation.name AS Name, $$.entity.name AS Value;

执行结果如下:

可以看到:本次查询返回 51 条数据,耗时 3 ms 左右;

  • 例2:查询姚明和其妻子叶莉在三跳之内的所有路径
(user@127.0.0.1) [test]> FIND ALL PATH FROM hash("姚明[中国篮球协会主席、中职联公司董事长]")  TO hash("叶莉") OVER relation UPTO 3 STEPS;

执行结果如下:

当数据量较大时,查找全路径/最短经之类的操作会比较耗时。可以看到:本次查询返回 8 条数据,说明姚明和其妻子叶莉在三跳之内共有 8 条直接或间接的关系。

总结

本篇文章涉及到的一些概念和链接:

后面的工作

  • 调整 Nebula 的参数。似乎默认的日志级别和内存都不是很好,可以用下面这个命令关闭日志,这样导入性能可以好很多。
curl "http://127.0.0.1:12000/set_flags?flag=minloglevel&value=4"
  • 写个对应的 Python 版本示例

附录

Nebula Graph GitHub 地址:https://github.com/vesoft-inc/nebula ,加入 Nebula Graph 交流群,请联系 Nebula Graph 官方小助手微信号:NebulaGraphbot

Nebula Graph:一个开源的分布式图数据库。
GitHub:https://github.com/vesoft-inc/nebula
知乎:https://www.zhihu.com/org/nebulagraph/posts
微博:https://weibo.com/nebulagraph

ShowMeBug 核心技术内幕

rina_gmail 发表了文章 • 0 个评论 • 736 次浏览 • 2019-11-21 11:46 • 来自相关话题

ShowMeBug 是一款远程面试工具,双方可通过在线面试板进行实时沟通技术。所以关键技术要点在于 “实时同步”。关于实时同步,ShowMeBug 采用了以下技术。 OT 转换算法 ...查看全部

ShowMeBug 是一款远程面试工具,双方可通过在线面试板进行实时沟通技术。所以关键技术要点在于 “实时同步”。关于实时同步,ShowMeBug 采用了以下技术。



OT 转换算法


本质上,ShowMeBug 核心就是多人同时在线实时编辑,难点即在这里。因为网络原因,操作可能是异步到达,丢失,与他人操作冲突。想想这就是个复杂的问题。



经过研究,最好用户体验的方式是 OT 转换算法。此算法由 1989 年 C. Ellis 和 S. Gibbs 首次提出,目前像 quip,google docs 均用的此法。



OT 算法允许用户自由编辑任意行,包括冲突的操作也可以很好支持,不用锁定。它的核心算法如下:



文档的操作统一为以下三种类型的操作( Operation ):



  1. retain(n): 保持 n 个字符
  2. insert(s): 插入字符串 s
  3. delete(s): 删除字符串 s


然后客户端与服务端各记录历史版本,每次操作都经过一定的转换后,推送给另一端。



转换的核心是



S(o_1, o_2) = S(o_2, o_1)



换言之,把正在并发的操作进行转换合并,形成新的操作,然后应用在历史版本上,就可以实现无锁化同步编辑。



下图演示了对应的操作转换过程。



https://daotestimg.dao42.com/ipic/070918.jpg



这个算法的难点在于分布式的实现。客户端服务端均需要记录历史,并且保持一定的序列。还要进行转换算法处理。



OT Rails 侧的处理


本质上,这是一个基于 websocket 的算法应用。所以我们没有怀疑就选用 ActionCable 作为它的基础。想着应该可以为我们节省大量的时间。实际上,我们错了。



ActionCable 实际上与 NodeJS 版本的 socket.io 一样,不具备任何可靠性的保障,做一些玩意性的聊天工具还可以,或者做消息通知允许丢失甚至重复推送的弱场景是可以的。但像 OT 算法这种强要求的就不可行了。



因为网络传输的不可靠性,我们必须按次序处理每一个操作。所以首先,我们实现了一个互斥锁,也就是针对某一个面试板,准备一个锁,同时只有一个操作可以进行操作。锁采用了 Redis 锁。实现如下:



  def unlock_pad_history(lock_key)
logger.debug "[padable] unlock( lock_key: #{lock_key} )..."
old_lock_key = REDIS.get(_pad_lock_history_key)
if old_lock_key == lock_key
REDIS.del(_pad_lock_history_key)
else
log = "[FIXME] unlock_pad_history expired: lock_key=#{lock_key}, old_lock_key=#{old_lock_key}"
logger.error(log)
e = RuntimeError.new(log)
ExceptionNotifier.notify_exception(e, lock_key: lock_key, old_lock_key: old_lock_key)
end
end

# 为防止死锁,锁的时间为5分钟,超时自动解锁,但在 unlock 时会发出异常
def lock_pad_history(lock_key)
return REDIS.set(_pad_lock_history_key, lock_key, nx: true, ex: 5*60)
end

def wait_and_lock_pad_history(lock_key, retry_times = 200)
total_retry_times = retry_times
while !lock_pad_history(lock_key)
sleep(0.05)
logger.debug '[padable] locked, waiting 50ms...'
retry_times-=1
raise "wait_and_lock_pad_history(in #{total_retry_times*0.1}s) #{lock_key} failed" if retry_times == 0
end
logger.debug "[padable] locking it(lock_key: #{lock_key})..."
end




服务端的并发控制完毕后,客户端通过 “状态队列” 技术一个个排队发布操作记录,核心如下:



class PadChannelSynchronized {
sendHistory(channel, history){
channel._sendHistory(history)
return new PadChannelAwaitingConfirm(history)
}
}

class PadChannelAwaitingConfirm {
constructor(outstanding_history) {
this.outstanding_history = outstanding_history
}

sendHistory(channel, history){
return new PadChannelAwaitingWithHistory(this.outstanding_history, history)
}

receiveHistory(channel, history){
return new PadChannelAwaitingConfirm(pair_history[0])
}

confirmHistory(channel, history) {
if(this.outstanding_history.client_id !== history.client_id){
throw new Error('confirmHistory error: client_id not equal')
}
return padChannelSynchronized
}
}

class PadChannelAwaitingWithHistory {
sendHistory(channel, history){
let newHistory = composeHistory(this.buffer_history, history)
return new PadChannelAwaitingWithHistory(this.outstanding_history, newHistory)
}
}

let padChannelSynchronized = new PadChannelSynchronized()

export default padChannelSynchronized




以上,便实现了一个排队发送的场景。



除此之外,我们设计了一个 PadChannel 用来专门管理与服务器通信的事件,维护历史的状态,处理断线重传,操作转换与校验。



定义自己的历史(history) 协议


解决了编辑器协同的问题,才是真正的问题的开始。每次的 ”代码运行”,“编辑”,“清空终端”,“首次同步” 都是需要记录的历史操作。于是,ShowMeBug 定义了以下协议:



  # 包含以下: edit( 更新编辑器内容 ), run( 执行命令 ), clear( 清空终端 ), sync( 同步数据 )
# select( 光标 ), locate( 定位 )
# history 格式如下:
#
# {
# op: 'run' | 'edit' | 'select' | 'locate' | 'clear'
# id: id // 全局唯一操作自增id, 首次前端传入时为 null, 服务端进行填充, 如果返回时为空, 则说明此 history 被拒绝写入
# version: 'v1' // 数据格式版本
# prev_id: prev_id // JS端生成 history 时上一次收到服务端的 id, 用于识别操作序列
# client_id: client_id // 客户端生成的 history 的唯一标识
# creator_id: creator_id // 操作人的用户id, 为了安全首次前端传入时为 null,由中台填充
# event: { // op 为 edit 时, 记录编辑器 OT 转化后的数据, see here: https://github.com/Aaaaash/blog/issues/10
# [length, "string", length]
# // op 为 select 时, 记录编辑器选择区域(包括光标)
# }
# snapshot: {
# editor_text: '' // 记录当前编辑器内容快照, 此快照由服务端填充
# language_type: '' // 记录当前编辑器的语言种类
# terminal_text: '' // 记录当前终端快照
# }
# }
# created_at: created_at // 生成时间




值得说明的是,client_id 是客户端生成的一个8位随机码,用于去重和与客户端进行 ACK 确认。



id 是由服务端 Redis 生成的自增 id,客户端会根据这个判断历史是否是新的。prev_id 用来操作转换时记录所需要进行转换操作的历史队列。



event 是最重要的操作记录,我们用 OT 的转换数据进行存储,如: [length, "string", length]



通过上述的设计,我们将面试板的所有操作细节涵盖了,从而实现多人面试实时同步,面试题和面试语言自动同步,操作回放等核心功能。



总结


篇幅限制,这里只讲到 ShowMeBug 的核心技术,更多的细节我们以后继续分享。



ShowMeBug 目前承载了 3000 场面试记录,成功支撑大量的实际面试官的面试,可靠性已得到进一步保障。这里面有两种重要编程范式值得考虑:



  1. 如何在不可信链路上设计一种有序可靠的交付协议,定义清晰的协议数据和处理好异步事件是关键。
  2. 如何平衡研发效率与稳定性之间的关系,比如实现的忙等锁,允许一定原因的失败,但处理好用户的提示与重试。既高效完成了功能,又不影响到用户体验。


ShowMeBug( showmebug.com ) 让你的技术面试更高效,助你找到你想要的候选人。

Nebula 架构剖析系列(二)图数据库的查询引擎设计

NebulaGraph 发表了文章 • 0 个评论 • 661 次浏览 • 2019-11-21 11:10 • 来自相关话题

摘要 上文(存储篇)说到数据库重要的两部分为存储和计算,本篇内容为你解读图数据库 Nebula 在查询引擎 Query Engine 方面的设计实践。 在 Nebula ...查看全部

摘要


上文(存储篇)说到数据库重要的两部分为存储和计算,本篇内容为你解读图数据库 Nebula 在查询引擎 Query Engine 方面的设计实践。


在 Nebula 中,Query Engine 是用来处理 Nebula 查询语言语句(nGQL)。本篇文章将带你了解 Nebula Query Engine 的架构。



上图为查询引擎的架构图,如果你对 SQL 的执行引擎比较熟悉,那么对上图一定不会陌生。Nebula 的 Query Engine 架构图和现代 SQL 的执行引擎类似,只是在查询语言解析器和具体的执行计划有所区别。


Session Manager


Nebula 权限管理采用基于角色的权限控制(Role Based Access Control)。客户端第一次连接到 Query Engine 时需作认证,当认证成功之后 Query Engine 会创建一个新 session,并将该 session ID 返回给客户端。所有的 session 统一由 Session Manger 管理。session 会记录当前 graph space 信息及对该 space 的权限。此外,session 还会记录一些会话相关的配置信息,并临时保存同一 session 内的跨多个请求的一些信息。


客户端连接结束之后 session 会关闭,或者如果长时间没通信会切为空闲状态。这个空闲时长是可以配置的。
客户端的每个请求都必须带上此 session ID,否则 Query Engine 会拒绝此请求。


Storage Engine 不管理 session,Query Engine 在访问存储引擎时,会带上 session 信息。


Parser



Query Engine 解析来自客户端的 nGQL 语句,分析器(parser)主要基于著名的 flex / bison 工具集。字典文件(lexicon)和语法规则(grammar)在 Nebula 源代码的 src/parser  目录下。设计上,nGQL 的语法非常接近 SQL,目的是降低学习成本。 图数据库目前没有统一的查询语言国际标准,一旦 ISO/IEC 的图查询语言(GQL)委员会发布 GQL 国际标准,nGQL 会尽快去实现兼容。
Parser 构建产出的抽象语法树(Abstrac Syntax Tree,简称 AST)会交给下一模块:Execution Planner。


Execution Planner



执行计划器(Execution Planner)负责将抽象树 AST 解析成一系列执行动作 action(可执行计划)。action 为最小可执行单元。例如,典型的 action 可以是获取某个节点的所有邻节点,或者获得某条边的属性,或基于特定过滤条件筛选节点或边。当抽象树 AST 被转换成执行计划时,所有 ID 信息会被抽取出来以便执行计划的复用。这些 ID 信息会放置在当前请求 context 中,context 也会保存变量和中间结果。


Optimization



经由 Execution Planner 产生的执行计划会交给执行优化框架 Optimization,优化框架中注册有多个 Optimizer。Optimizer 会依次被调用对执行计划进行优化,这样每个 Optimizer都有机会修改(优化)执行计划。最后,优化过的执行计划可能和原始执行计划完全不一样,但是优化后的执行结果必须和原始执行计划的结果一样的。


Execution



Query Engine 最后一步是去执行优化后的执行计划,这步是执行框架(Execution Framework)完成的。执行层的每个执行器一次只处理一个执行计划,计划中的 action 会挨个一一执行。执行器也会一些有针对性的局部优化,比如:决定是否并发执行。针对不同的 action所需数据和信息,执行器需要经由 meta service 与storage engine的客户端与他们通信。


最后,如果你想尝试编译一下 Nebula 源代码可参考如下方式:



有问题请在 GitHub(GitHub 地址:github.com/vesoft-inc/nebula) 或者微信公众号上留言,也可以添加 Nebula 小助手微信号:NebulaGraphbot 为好友反馈问题~


推荐阅读


Dev 日志 | 一次 Segmentation Fault 和 GCC Illegal Instruction 编译问题排查

NebulaGraph 发表了文章 • 0 个评论 • 691 次浏览 • 2019-11-20 10:33 • 来自相关话题

摘要

笔者最近在重新整理和编译 Nebula Graph 的第三方依赖,选出两个比较有意思的问题给大家分享一下。

Flex Segmentation Fault——Segmentation fault (core dumped)

在编译 Flex 过程中,遇到了 Segmentation fault:

make[2]: Entering directory '/home/dutor/flex-2.6.4/src'
./stage1flex -o stage1scan.c ./scan.l
make[2]: *** [Makefile:1696: stage1scan.c] Segmentation fault (core dumped)

使用 gdb 查看 coredump:

Core was generated by `./stage1flex -o stage1scan.c ./scan.l'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0 flexinit (argc=4, argv=0x7ffd25bea718) at main.c:976
976 action_array[0] = '\0';
(gdb) disas
Dump of assembler code for function flexinit:
0x0000556c1b1ae040 <+0>: push %r15
0x0000556c1b1ae042 <+2>: lea 0x140fd(%rip),%rax # 0x556c1b1c2146
...
0x0000556c1b1ae20f <+463>: callq 0x556c1b1af460 <allocate_array> #这里申请了buffer
...
=> 0x0000556c1b1ae24f <+527>: movb $0x0,(%rax) # 这里向buffer[0]写入一个字节,地址非法,挂掉了
...
(gdb) disas allocate_array
Dump of assembler code for function allocate_array:
0x0000556c1b1af460 <+0>: sub $0x8,%rsp
0x0000556c1b1af464 <+4>: mov %rsi,%rdx
0x0000556c1b1af467 <+7>: xor %eax,%eax
0x0000556c1b1af469 <+9>: movslq %edi,%rsi
0x0000556c1b1af46c <+12>: xor %edi,%edi
0x0000556c1b1af46e <+14>: callq 0x556c1b19a100 <reallocarray@plt> # 调用库函数申请内存
0x0000556c1b1af473 <+19>: test %eax,%eax # 判断是否为 NULL
0x0000556c1b1af475 <+21>: je 0x556c1b1af47e <allocate_array+30># 跳转至NULL错误处理
0x0000556c1b1af477 <+23>: cltq # 将 eax 符号扩展至 rax,造成截断
0x0000556c1b1af479 <+25>: add $0x8,%rsp
0x0000556c1b1af47d <+29>: retq
...
End of assembler dump.

可以看到,问题出在了 allocate_array 函数。因为 reallocarray 返回指针,返回值应该使用 64 bit 寄存器rax,但 allocate_array 调用 reallocarray 之后,检查的却是 32 bit 的 eax,同时使用 cltq 指令将 eax 符号扩展 到 rax。原因只有一个:allocate_array 看到的 reallocarray 的原型,与 reallocarry 的实际定义不符。翻看编译日志,确实找到了 implicit declaration of function 'reallocarray' 相关的警告。configure 阶段添加 CFLAGS=-D_GNU_SOURCE 即可解决此问题。

注:此问题不是必现,但编译/链接选项 -pie 和 内核参数 kernel.randomize_va_space 有助于复现。

总结:

  • 隐式声明的函数在 C 中,返回值被认为是 int
  • 关注编译器告警,-Wall -Wextra 要打开,开发模式下最好打开 -Werror。

GCC Illegal Instruction——internal compiler error: Illegal instruction

前阵子,接到用户反馈,在编译 Nebula Graph 过程中遭遇了编译器非法指令的错误,详见(#978)[https://github.com/vesoft-inc/nebula/issues/978]

错误信息大概是这样的:

Scanning dependencies of target base_obj_gch
[ 0%] Generating Base.h.gch
In file included from /opt/nebula/gcc/include/c++/8.2.0/chrono:40,
from /opt/nebula/gcc/include/c++/8.2.0/thread:38,
from /home/zkzy/nebula/nebula/src/common/base/Base.h:15:
/opt/nebula/gcc/include/c++/8.2.0/limits:1599:7: internal compiler error: Illegal instruction
min() _GLIBCXX_USE_NOEXCEPT { return FLT_MIN; }
^~~
0xb48c5f crash_signal
../.././gcc/toplev.c:325
Please submit a full bug report,
with preprocessed source if appropriate.

既然是 internal compiler error,想必是 g++ 本身使用了非法指令。为了定位具体的非法指令集及其所属模块,我们需要复现这个问题。幸运的是,下面的代码片段就能触发:

#include <thread>
int main()
{
return 0;
}

非法指令一定会触发 SIGILL,又因为 g++ 只是编译器的入口,真正干活的是 cc1plus。我们可以使用 gdb 来运行编译命令,抓住子进程使用非法指令的第一现场:

$ gdb --args /opt/nebula/gcc/bin/g++ test.cpp
gdb> set follow-fork-mode child
gdb> run
Starting program: /opt/nebula/gcc/bin/g++ test.cpp
[New process 31172]
process 31172 is executing new program: /opt/nebula/gcc/libexec/gcc/x86_64-pc-linux-gnu/8.2.0/cc1plus
Thread 2.1 "cc1plus" received signal SIGILL, Illegal instruction.
[Switching to process 31172]
0x00000000013aa0fb in __gmpn_mul_1 ()
gdb> disas
...
0x00000000013aa086 <+38>: mulx (%rsi),%r10,%r8
...

Bingo!mulx 属于 BMI2 指令集,报错机器 CPU 不支持该指令集。
仔细调查,引入该指令集的是 GCC 的依赖之一,GMP。默认情况下,GMP 会在 configure 阶段探测当前机器的 CPU 具体类型,以期最大化利用 CPU 的扩展指令集,提升性能,但却牺牲了二进制的可移植性。解决方法是,在 configure 之前,使用代码目录中的 configfsf.guess configfsf.sub 替换或者覆盖默认的 config.guess 和 config.sub

总结:

  • 某些依赖可能因为性能或者配置的原因,造成二进制的不兼容。
  • 缺省参数下,GCC 为了兼容性,不会使用较新的指令集。
  • 为了平衡兼容性和性能,你需要做一些额外的工作,比如像 glibc 那样在运行时选择和绑定某个具体实现。

最后,如果你想尝试编译一下 Nebula 源代码可参考以下方式:

bash> git clone https://github.com/vesoft-inc/nebula.git
bash> cd nebula && ./build_dep.sh N

有问题请在 GitHub 或者微信公众号上留言,也可以添加 Nebula 小助手微信号:NebulaGraphbot 为好友反馈问题~

附录


go 学习笔记之咬文嚼字带你弄清楚什么是 defer延迟函数

snowdreams1006 发表了文章 • 0 个评论 • 758 次浏览 • 2019-11-19 18:53 • 来自相关话题

https://mp.weixin.qq.com/s/t5tmqsjZ4y4Z_n6u4c9bMw通过本文,我们知道了延迟函数的执行时机以及一些细节,关键是理解 Each time a "defer" ...查看全部

https://mp.weixin.qq.com/s/t5tmqsjZ4y4Z_n6u4c9bMw

通过本文,我们知道了延迟函数的执行时机以及一些细节,关键是理解 Each time a "defer" statement executes, the function value and parameters to the call are evaluated as usual and saved anew but the actual function is not invoked. 这句话,绝对是重中之重!


简而言之,延迟函数在声明时会收集相关参数赋值拷贝一份入栈,时机合适时再从入栈环境中寻找相关环境参数,如果找不到就扩大范围寻找外层函数是否包含所需变量,执行过程也就是延迟函数的出栈.


有一个消防员专门负责保卫商场的安全,每天商场进进出出很多人流,总有一些重要人物也会来到商场购物,突然有一天,发生了火灾,正在大家惊慌失措中...


这个消防员到底干了什么才能保证重要人物安全的同时也能让他们不遭受财产损失?


go-error-defer-panic.png
go-error-defer-panic.png

请补充你的答案,感谢你的阅读与关注,下一节再见~


阅读延伸以及参考文档



  • Defer_statements[3]
  • Built-in_functions[4]
  • Go 语言规格说明书 之 内建函数(Built-in functions)[5]
  • go 语言快速入门:内建函数(6)[6]
  • 你知道 defer 的坑吗?[7]
  • golang 语言 defer 特性详解.md[8]
  • Golang 之轻松化解 defer 的温柔陷阱[9]

参考资料



[1]

go 学习笔记之解读什么是defer延迟函数: https://mp.weixin.qq.com/s/XttOuCEk7kgySKLOCqVMRQ



[2]

defer关键字: https://tiancaiamao.gitbooks.io/go-internals/content/zh/03.4.html



[3]

Defer_statements: https://golang.google.cn/ref/spec#Defer_statements



[4]

Built-in_functions: https://golang.google.cn/ref/spec#Built-in_functions



[5]

Go语言规格说明书 之 内建函数(Built-in functions): https://www.cnblogs.com/luo630/p/9669966.html



[6]

go语言快速入门:内建函数(6): https://blog.csdn.net/liumiaocn/article/details/54804074



[7]

你知道defer的坑吗?: https://www.jianshu.com/p/9a7364762714



[8]

golang语言defer特性详解.md: https://www.jianshu.com/p/57acdbc8b30a



[9]

Golang之轻松化解defer的温柔陷阱: https://segmentfault.com/a/1190000018169295





COSCon'19 | 如何设计新一代的图数据库 Nebula

NebulaGraph 发表了文章 • 0 个评论 • 941 次浏览 • 2019-11-12 10:50 • 来自相关话题

11 月 2 号 - 11 月 3 号,以“大爱无疆,开源无界”为主题的 2019 中国开源年会(COSCon'19)正式启动,大会以开源治理、国际接轨、社区发展和开源项目为切入点同全球开源爱好者们共同交流开源。作为图数据库技术的代表,Ne ...查看全部

11 月 2 号 - 11 月 3 号,以“大爱无疆,开源无界”为主题的 2019 中国开源年会(COSCon'19)正式启动,大会以开源治理、国际接轨、社区发展和开源项目为切入点同全球开源爱好者们共同交流开源。

作为图数据库技术的代表,Nebula Graph 总监——吴敏在本次大会上将会讲述了大规模分布式图数据库设计思考和实践。在信息爆发式增长和内容平台遍地开花的信息时代,图数据库在当中扮演了什么样的角色?同传统数据库相比,图数据库又有什么优势?图数据库开发需要哪些新技术?就此,开源社特访吴敏来分享下图数据库主题内容,从图数据 Nebula 的研发开始,就传统数据库面临的挑战,开源模式的优势,Nebula 的社区开展和产品规划等问题进行深入解析。

About Nebula 总监--吴敏

开源社:Hi,吴敏,先和大家介绍下自己。

大家好,我是吴敏,VEsoft 总监,博士毕业于浙江大学。 曾就职于阿里云、蚂蚁金服,从事分布式图数据库以及云存储相关工作。

开源社:谈谈您在 COSCon'19 上的分享话题。

随着抖音、小红书等社交内容平台的爆红诞生了一种基于社交关系网路的推荐需求,而以垂直领域作为切入点的知识图谱过去两年的“爆火”,传统数据库在处理社交推荐、风控、知识图谱方面的性能缺陷,图数据库的研发应运而生。

本演讲开篇将陈述图数据库行业现状,让你对图数据库存储的数据及对场景有所了解,再从开源的分布式图数据库 Nebula Graph 切入深度讲解大规模分布式图数据库应该如何设计存储、计算及架构,最后讲述开源对图数据库开发的影响。

内容大纲

  1. 图数据库概述及应用
  2. Nebula Graph 设计介绍
  3. 技术细节
  4. 开源社区及服务

开源社:哪些人可以应该了解这个内容?

对图数据库有兴趣,或是有推荐、风控、知识图谱等业务场景需求的人。

Nebula 研发之旅

开源社:为什么给图数据库取名 Nebula ?

Nebula 是星云的意思,很大嘛,也是漫威宇宙里面漂亮的星云小姐姐。对了,Nebula的发音是:[ˈnɛbjələ]

开源社:现在数据库领域百花齐放,国产的 OceanBase 和 TiDB 都发展得不错,为什么还要研发 Nebula 这样的图数据库?

OceanBase、TiDB 这类 NewSQL 最近发展势头很强劲,他们的出现更多的是对传统单机的关系型数据库在可用性的补充。
Nebula 聚焦在图数据库这一领域,也是近年来在数据库各分支中增长最为快速的领域。图数据库使用图(或者网)的方式很直接、自然的表达现实世界的关系: 用节点来表示实体,边来表示关联关系,everything is connected。能高效的提供图检索,提供专业的分析算法、工具,比如 ShortestPath、PageRank、标签传播等等。

开源社:图数据库应用场景有哪些?

典型的应用场景有社交网络,金融风控,推荐引擎,知识图谱等。

社交网络,比如,推荐一条最短路径让我结识迪纳热巴,还可以加上筛选条件,路径中的每个人都是单身女性。

金融风控场景,比如,去查一个信用卡反套现的网络。很典型的一个场景,A 转账到 B,B 转账到 C,C 又转回给 A 即是一个典型的闭环。对于这样的闭环,这类查询在图数据库大规模应用之前,大部分都是采用离线计算的方式去查找,但是离线场景很难去控制当前发生的这笔交易。一个信用卡交易或者在线贷款,整个作业流程很长,而在反套现这块的审核时间又限制在毫秒级,这就是图数据库非常大的一个应用场景。

在推荐算法中,为某个人推荐他的好友。现在的方案是去找好友的好友,判断好友的好友有没有可能成为某人的新好友,这当中涉及好友关系的亲密度,抵达好友的好友的最短路径等。业务方可能会用 MySQL 等传统数据库或是 HBase 来存各类好友关系,然后通过多个串行的 Key-Value 来做查询,但这在线上场景是很难满足性能要求的。

知识图谱这些年非常火,知识图谱结合自然语言的形式在金融,医疗,互联网等众多领域被广泛使用,常见的有语音助手、聊天机器人、智能问答等应用场景。而图数据库存储的数据结构完全适配知识图谱数据,图谱中的实体对应图数据库的点,实体与实体的关系对应图数据库的边,拿 Nebula 为例,Nebula Graph Schema 采用属性图,点边上的属性对应图谱实体和关系中的属性,边的方向表示了关系的方向,边上的标记表示了关系的类型。

再说到最近国内非常火的区块链场景,由于区块链上的所有行为都是公开被记录的又是不可篡改的,因此所有的交易行为,不管是历史数据,还是大概每几分钟产生的新 block,都可以对 DAT 文件解析后导入到图数据库和 GNN 中做分析。例如我们都听说在一些数字货币场景下,洗钱、盗窃、团伙、操纵市场的各类事情很多,通过图的手段包括可以帮助我们挖掘里面的非法行为。

开源社:作为图数据库,有参考借鉴了哪些数据库吗?哪些方面是 Nebula 有特点的设计?

Nebula 是完全自主研发的数据库,它主要有以下的技术特点

存储计算分离

对于 Nebula Graph 来讲,有这么几个技术特点:第一个就是采用了存储计算分离的架构,主要好处就是为了上云或者说弹性,方便单独扩容。业务水位总是很难预测的,一段时间存储不够了,有些时候计算不够了。在云上或者使用容器技术,计算存储分离的架构运维起来会比较方便,成本也更好控制。大家使用 HBase 那么久,这方面的感触肯定很多。

查询语言 nGQL

Nebula Graph 的第二个技术特点是它的查询语言,我们称为 nGQL,比较接近 SQL。唯一大一点的语法差异就是 不用嵌套 (embedding)。大家都知道嵌套的 SQL,读起来是非常痛苦的,要从里向外读。另外,由于图这块目前并没有统一的国际标准,这对整个行业的发展并不是好事,用户的学习成本很高。目前有个 ISO / IEC 组织在准备图语言的国际标准,我们也在积极兼容标准。

支持多种后端存储

第三个特点就是 Nebula Graph 支持多种后端存储,除了原生的引擎外,也支持 HBase。 因为很多用户,对 HBase 已经相当熟悉了,并不希望多一套存储架构。从架构上来说,Nebula Graph 是完全对等的分布式系统。

计算下推

和 HBase 的 CoProcessor 一样,Nebula Graph 支持数据计算下推。数据过滤,包括一些简单的聚合运算,能够在存储层就做掉,这样对于性能来讲能提升会非常大。

多租户

多租户,Nebula Graph是通过多 Space 来实现的。Space 是物理隔离。

索引

除了图查询外,还有很常见的一种场景是全局的属性查询。这个和 MySQL 一样,要提升性能的主要办法是为属性建立索引 ,这个也是 Nebula Graph 原生支持的功能。

图算法

最后的技术特点就是关于图算法方面。
这里的算法和全图计算不太一样,更多是一个子图的计算,比如最短路径。大家知道数据库通常有 OLTP 和 OLAP 两种差异很大的场景,当然现在有很多 HTAP 方面的努力。那对于图数据库来说也是类似,我们在设计 Nebula Graph 的时候,做了一些权衡。我们认为全图的计算,比如 Page Rank,LPA,它的技术挑战和 OLTP 的挑战和对应的设计相差很大。所以 Nebula 的查询引擎主要针对 OLTP 类的场景。
那么,对于 OLAP 类的计算需求,我们的考虑是通过支持和 Spark 的相互访问,来支持 Spark 上图计算,比如 graphX。这块工作正在开发中,应该在最近一两个月会发布。

开源社:为什么会考虑存储计算分离的架构呢?

存储计算分离是个很热的话题。我们将存储模块和 Query Engine 层分开主要有以下考虑。

  1. 成本的原因。存储和计算对计算机资源要求不一样,存储依赖 I/O,计算对 CPU 和内存的要求更高,业务在不同的应用或者发展时期,需要不同的存储空间和计算能力配比,存储和计算的耦合会使得机器的选型会比较复杂,存储计算分离的架构,使得 storage 的 scale out/in 更容易。
  2. 存储层抽象出来可以给计算带来新的选择,比如对接 Pregel, Spark GraphX 这些计算引擎。通常来说,图计算对于存储的要求是吞吐量优先的,而在线查询是时延优先的。通过把存储层分离出来,不管是开发的时候(做 QoS )还是运维的时候(单独集群部署),都会更容易一些。
  3. 在云计算场景下,能实现真正的弹性计算。

开源社:作为一个分布式数据库,是如何保障数据一致性的?

我们使用 Raft 协议,Raft 一致性协议使得 shared-nothing 的 kv 有一致性保障。为什么选择 Raft?相对于 Paxos,Raft 更加有利于工程化实现。Nebula 存储层 Raft 使用 Multi-Raft 的模型,多个 replica 上的同一个 partition 组成一个 Raft 组,同一个集群内存在互相独立 Raft 组,在一致性保障的同时,提高了系统的并发能力。

开源社:在数据库的优化方面,Nebula 做了哪些?

Nebula 在数据优化方面主要做了以下工作:

  1. 异步和并发执行:由于 IO 和网络均为长时延操作,Nebula Graph 采用异步及并发操作。此外,为避免一些大query 的长尾影响,为每个 query 设置单独的资源池以保证服务质量 QoS。
  2. 计算下沉:为避免存储层将过多数据回传到计算层,占用宝贵带宽,条件过滤等算子会随查询条件一同下发到存储层节点。
  3. 数据库系统的优化与数据的物理存储方式以及数据的分布息息相关。而且随着业务的发展,数据分布是会发生变化的,一开始设计的索引和数据存储或者分区会慢慢变得不是最优的,这就需要系统能够做一些动态的调整。我们 storage 支持 scale out/in, load balance。系统的调整会带来 overhead,这是需要权衡考虑的问题。

开源社:现在市面上已有一些图数据库,Nebula 考虑兼容部分数据库让已有的用户无缝切到 Nebula 吗?

Nebula 有 CSV、HDFS 批量 数据导入工具。用户可以将数仓的数据导入到 Nebula。也提供 C++,Java,Golang,Python 的客户端。另外对于市面上已有的一些产品,现在也正在开发将它的数据格式直接解析为 Nebula 的数据格式,这样就可以非常方便的迁移,包括查询语言层面的兼容。

开源社:水平伸缩能够支持多大的规模?

存储层 shared-nothing 的架构,理论上支持无限加机器。

开源社:Nebula 最新的版本 RC1 支持最短路径和全路径算法,可以具体讲下这块的实现,及以后的研发规划吗?

目前实现较为简单,基于双向搜索,返回点边组合的路径。未来规划是计划在执行计划与优化器都完成后,完善对路径的支持,包括实现 match,支持双向 bfs、双向 dijkstra、allpair(全路径),kshortest 等。当然我们欢迎社区的同学们都参与完善 Nebula 的路径算法。

开源社:使用 Nebula 之前,用户应该做哪些准备工作?

对于刚开始使用图数据库的用户,我们提供了详细的文档;

对于已经在使用其他图数据库,想要试试 Nebula 的用户,我们提供了数据导入等工具,有疑问或者任何问题,欢迎在 GitHub 上给我们提 issue,我们的工程师会在第一时间为您解答。

Nebula 和开源

开源社:作为一个企业级产品,为什么 Nebula 一开始就选择了走开源路线?

如果没 Linux,现在互联网的格局也不会是今天这样。我们想要建立图数据库的社区,做出更好的图数据库产品,也希望更多对 Nebula,对图数据库感兴趣的同学成为社区的贡献者,一起努力,共同建立一个互助互利的社区。

开源社:在开源的过程中,有遇到什么困难吗?

很多人都想为开源做一份力,但会被开源项目的门槛“劝退”,尤其是 Nebula 是一个即使耕耘在数据库领域多年的数据库专家,如果对图数据库的不够了解的话,都会感叹“高大上”的一个项目。但技术是为业务服务的,所以 Nebula 力求自己的文档让你即使你对图数据库一无所知,通过 Nebula 的文档也能够了解到图数据库及其应用场景。

开源社:在开源社区搭建这块,有什么可以和开源社小伙伴们分享的吗?

开源项目最重要的是生态的搭建,Nebula Graph 刚开源半年在社区搭建这块只能说略有心得,仅供大家参考 :) 开源社区运营主要从下面几个方面展开

  1. 简洁明了的文档:一个好的文档能让使用者快速同产品拉近距离,Nebula 的文档从“让非技术人做技术事”的出发,力求即使你是一个不懂技术的人也可以按照文档部署 Nebula,玩起来——用 Nebula 完成简单的 CRUD,如果开源社的小伙伴阅读过 Nebula 文档觉得哪里有更改意见,欢迎联系我们;
  2. 实时的反馈回复:用户的反馈,我们会第一时间进行回复,在 GitHub 的 issue 及用户交流群里进行回复;
  3. 同用户直接对话:在线上,Nebula 在各大技术平台同图数据库和 Nebula 爱好者们进行交流,包括 Nebula 架构设计、用户使用实操等系列文章;在线下,我们也开展了主题 Meetup 同各地爱好者交流图数据库技术及 Nebula 的开发心得;
  4. 社区用户体系:在 Nebula 的 GitHub 上,现阶段你可以看到 3 种用户,User、Contributor、Committer,User 通过向 Nebula 提 issue / pr 或者投稿等方式成为 Contributor,Contributor 再进阶成为 Committer。配合 Nebula 开展的各类社区活动,eg:捉虫活动,帮助社区用户完成角色“升级”;

最后,打个小广告:欢迎大家来参与到 Nebula 的建设中,为开源贡献一份力 :)

程序员寄语

开源社: 作为资深数据库从业人员,怎样让自己的眼界更加开阔,怎么获取这个领域的最前沿信息?


多看看论文,看看开源分布式系统的设计以及源代码;多关注数据库的的会议,比如,SIGMOD, VLDB,关注学术界的最新成果;多关注业界相关公司的发展和动态,比如 OsceanBase,TiDB。

Nebula 有话说

以上为开源社对图数据库 Nebula 总监——吴敏的采访,欢迎你关注 Nebula GitHub:github.com/vesoft-inc/nebula 了解 Nebula 最新动态或添加 Nebula 小助手为好友进图数据库技术交流群交流,小助手微信号:NebulaGraphbot

推荐阅读

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

panjf2000 发表了文章 • 0 个评论 • 1177 次浏览 • 2019-11-10 22:48 • 来自相关话题

原文Go netpoll I/O 多路复用构建原生网络模型之源码深度解析 ...查看全部

原文

Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

导言

Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于 Go 的I/O 多路复用 netpoll),提供了 goroutine-per-connection 这样简单的网络编程模式。在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。

然而,在工程性上能做到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于非常复杂的封装,做了很多取舍,也有可能放弃了一些『极致』的设计和理念。事实上netpoll底层就是基于 epoll/kqueue/iocp 这些系统调用来做封装的,最终暴露出 goroutine-per-connection 这样的极简的开发模式给使用者。

Go netpoll 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

本文将基于 linux 平台来解析 Go netpoll 之 I/O 多路复用的底层是如何基于 epoll 封装实现的,从源码层层推进,全面而深度地解析 Go netpoll 的设计理念和实现原理,以及 Go 是如何利用netpoll来构建它的原生网络模型的。主要涉及到的一些概念:I/O 模式、用户/内核空间、epoll、linux 源码、goroutine scheduler 等等,我会尽量简单地讲解,如果有对相关概念不熟悉的同学,还是希望能提前熟悉一下。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。

I/O 多路复用

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路复用 (I/O multiplexing)
  • 信号驱动 I/O (Signal driven I/O)
  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select & poll

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select 是 epoll 之前 linux 使用的 I/O 事件驱动技术。

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第5位置为1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

epoll 是 linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的等待消息准备好I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 预估错误了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的:返回的活跃连接 == select(全部待监控的连接)

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被高频二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

#include <sys/epoll.h>  
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是不太频繁被调用的,而 epoll_wait 则是非常频繁被调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 都会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为:ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdlist 双向链表中是否有就绪的 fd,当 rdlist 为空(无就绪fd)时挂起当前进程,直到 rdlist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

延伸

另外,我看到有些文章说 epoll 之所以性能高是因为利用了 linux 的 mmap 内存映射让内核和用户进程共享了一片物理内存,用来存放就绪 fd 列表和它们的数据 buffer,所以用户进程在 epoll_wait返回之后用户进程就可以直接从共享内存那里读取/写入数据了,这让我很疑惑,因为首先看epoll_wait的函数声明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二个参数:就绪事件列表,是需要在用户空间分配内存然后再传给epoll_wait的,如果内核会用 mmap 设置共享内存,直接传递一个指针进去就行了,根本不需要在用户态分配内存,多此一举。其次,内核和用户进程通过 mmap 共享内存是一件极度危险的事情,内核无法确定这块共享内存什么时候会被回收,而且这样也会赋予用户进程直接操作内核数据的权限和入口,非常容易出现大的系统漏洞,因此一般极少会这么做。所以我很怀疑 epoll 是不是真的在 linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源码:

/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
// ...

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
// ...

send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/


// 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
// 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
// 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
// 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

// ...
}

// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;

esed.maxevents = maxevents;
esed.events = events;
// 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
// 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}

// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
// ...

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/

revents = ep_item_poll(epi, &pt, 1);
// 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
if (!revents)
continue;
// 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
// 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}

// ...
}

do_epoll_wait开始层层跳转,我们可以很清楚地看到最后内核是通过__put_user函数把就绪 fd 列表和事件返回到用户空间,而__put_user正是内核用来拷贝数据到用户空间的标准函数。此外,我并没有在 linux kernel 的源码中和 epoll 相关的代码里找到 mmap 系统调用做内存映射的逻辑,所以基本可以得出结论:epoll 在 linux kernel 里并没有使用 mmap 来做用户空间和内核空间的内存共享,所以那些说 epoll 使用了 mmap 的文章都是误解。

Non-blocking I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK标志位,从而把 socket 设置成 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

Go netpoll

一个典型的 Go TCP server:

package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}

// start a new goroutine to handle the new connection
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// 如果没有可读数据,也就是读 buffer 为空,则阻塞
_, _ = conn.Read(packet)
// 同理,不可写则阻塞
_, _ = conn.Write(packet)
}
}

上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!

Go netpoll 核心

Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。

上面的示例代码中相关的在源码里的几个数据结构和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

// Conn
type conn struct {
fd *netFD
}

type conn struct {
fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

netFD

net.Listen("tcp", ":8888") 方法返回了一个 TCPListener,它是一个实现了 net.Listener 接口的 struct,而通过 listen.Accept() 接收的新连接 TCPConn 则是一个实现了 net.Conn 接口的 struct,它内嵌了 net.conn struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD 的数据结构的操作,netFD 是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。

netFDpoll.FD的源码:

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

pollDesc

前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:

type pollDesc struct {
runtimeCtx uintptr
}

这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在runtime.pollDesc这里:

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock

// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}

runtime.pollDesc包含自身类型的一个指针,用来保存下一个runtime.pollDesc的地址,以此来实现链表,可以减少数据结构的大小,所有的runtime.pollDesc保存在runtime.pollCache结构中,定义如下:

type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}

net.Listen

调用 net.Listen之后,底层会通过 Linux 的系统调用socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD,接着调用 netFD 的listenStream方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:

// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc func(int, int, int) (int, error) = syscall.Socket

// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...

// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}

// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}

// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once

// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)

// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}

// netpollopen 会被 runtime_pollOpen,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:

#include <sys/epoll.h>  
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服务端的 netFD 在listen时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列
  2. netFD 在accept时将返回的 connFD 也加入 epoll 的事件队列
  3. netFD 在读写时出现syscall.EAGAIN错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead中返回

Listener.Accept()接收来自客户端的新连接,具体还是调用netFD.accept方法来完成这个功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

netFD.accept方法里再调用poll.FD.Accept,最后会使用 linux 的系统调用accept来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。

Conn.Read/Conn.Write

我们先来看看Conn.Read方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链还是首先调用 conn 的netFD.Read,然后内部再调用 poll.FD.Read,最后使用 linux 的系统调用 read: syscall.Read完成数据读取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}

conn.Writeconn.Read的原理是一致的,它也是通过类似 pollDesc.waitReadpollDesc.waitWrite来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite的内部实现原理和pollDesc.waitRead是一样的,都是基于runtime_pollWait,这里就不再赘述。

pollDesc.waitRead

pollDesc.waitRead内部调用了 runtime_pollWait来达成无 I/O 事件时 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
// 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}

// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg

// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,然后重新调度运行该 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}

netpoll

前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由_Grunning_Gwaitting),因此G必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。

所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait,在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(block bool) gList 方法,它会内部调用epoll_wait获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
// 是否以阻塞模式调用 epoll_wait
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
// 判断发生的事件类型,读类型或者写类型
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}

// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}

// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintpt

[ gev ] Go 语言优雅处理 TCP “粘包”

惜朝 发表了文章 • 0 个评论 • 1422 次浏览 • 2019-11-01 10:48 • 来自相关话题

https://github.com/Allenxuxu/gev ...查看全部

https://github.com/Allenxuxu/gev

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

TCP 为什么会粘包


TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送(Nagle算法)。

gev 如何优雅处理


gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区(ringbuffer)通过参数传递过来。

用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。



做法如下,定义一个接口 Protocol

```go
// Protocol 自定义协议编解码接口
type Protocol interface {
UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
Packet(c *Connection, data []byte) []byte
}
```


用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

下面,我们实现一个简单的自定义协议插件,来启动一个 Server :

```text
| 数据长度 n | payload |
| 4字节 | n 字节 |
```

```go
// protocol.go
package main

import (
"encoding/binary"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
if buffer.VirtualLength() > exampleHeaderLen {
buf := pbytes.GetLen(exampleHeaderLen)
defer pbytes.Put(buf)
_, _ = buffer.VirtualRead(buf)
dataLen := binary.BigEndian.Uint32(buf)

if buffer.VirtualLength() >= int(dataLen) {
ret := make([]byte, dataLen)
_, _ = buffer.VirtualRead(ret)

buffer.VirtualFlush()
return nil, ret
} else {
buffer.VirtualRevert()
}
}
return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
dataLen := len(data)
ret := make([]byte, exampleHeaderLen+dataLen)
binary.BigEndian.PutUint32(ret, uint32(dataLen))
copy(ret[4:], data)
return ret
}
```

```go
// server.go
package main

import (
"flag"
"log"
"strconv"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Println("OnMessage:", data)
out = data
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)
var port int
var loops int

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

s, err := gev.NewServer(handler,
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.Protocol(&ExampleProtocol{}))
if err != nil {
panic(err)
}

log.Println("server start")
s.Start()
}
```
完整代码地址

当回调 `OnMessage` 函数的时候,会通过参数传递已经拆好包的用户数据。

当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 `gev.NewServer` 时指定即可:

```go
gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
```

## 基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件(WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

```go
package websocket

import (
"log"

"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
upgrade *ws.Upgrader
}

// New 创建 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 协议,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
upgraded := c.Context()
if upgraded == nil {
var err error
out, _, err = p.upgrade.Upgrade(buffer)
if err != nil {
log.Println("Websocket Upgrade :", err)
return
}
c.SetContext(true)
} else {
header, err := ws.VirtualReadHeader(buffer)
if err != nil {
log.Println(err)
return
}
if buffer.VirtualLength() >= int(header.Length) {
buffer.VirtualFlush()

payload := make([]byte, int(header.Length))
_, _ = buffer.Read(payload)

if header.Masked {
ws.Cipher(payload, header.Mask, 0)
}

ctx = &header
out = payload
} else {
buffer.VirtualRevert()
}
}
return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
return data
}
```

具体的实现,可以到仓库的 [plugins/websocket](https://github.com/Allenxuxu/gev/tree/master/plugins/websocket) 查看。

## 相关文章

- [开源 gev: Go 实现基于 Reactor 模式的非阻塞 TCP 网络库](https://note.mogutou.xyz/articles/2019/09/19/1568896693634.html)
- [Go 网络库并发吞吐量测试](https://note.mogutou.xyz/articles/2019/09/22/1569146969662.html)

## 项目地址

https://github.com/Allenxuxu/gev