高并发

高并发

[gev] 一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

Go开源项目惜朝 发表了文章 • 0 个评论 • 312 次浏览 • 2019-09-25 16:45 • 来自相关话题

`gev` 是一个 ...查看全部

`gev` 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。

➡️➡️ https://github.com/Allenxuxu/gev

特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持

网络模型

`gev` 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

性能测试

> 测试环境 Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

吞吐量测试

限制 GOMAXPROCS=1(单线程),1 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


其他测试

速度测试


和同类库的简单性能比较, 压测方式与 evio 项目相同。
- gnet
- eviop
- evio
- net (标准库)

限制 GOMAXPROCS=1,1 个 work 协程


限制 GOMAXPROCS=1,4 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


安装 gev


```bash
go get -u github.com/Allenxuxu/gev
```

快速入门


```go
package main

import (
"log"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
log.Println("OnMessage")
first, end := buffer.PeekAll()
out = first
if len(end) > 0 {
out = append(out, end...)
}
buffer.RetrieveAll()
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)

s, err := gev.NewServer(handler,
gev.Address(":1833"),
gev.NumLoops(2),
gev.ReusePort(true))
if err != nil {
panic(err)
}

s.Start()
}
```

Handler 是一个接口,我们的程序必须实现它。

```go
type Handler interface {
OnConnect(c *connection.Connection)
OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) []byte
OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error) {
```

在消息到来时,gev 会回调 OnMessage ,在这个函数中可以通过返回一个切片来发送数据给客户端。

```go
func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte)
```

Connection 还提供 Send 方法来发送数据。Send 并不会立刻发送数据,而是先添加到 event loop 的任务队列中,然后唤醒 event loop 去发送。

更详细的使用方式可以参考示例:[服务端定时推送]

```go
func (c *Connection) Send(buffer []byte) error
```

Connection ShutdownWrite 会关闭写端,从而断开连接。

更详细的使用方式可以参考示例:[限制最大连接数]

```go
func (c *Connection) ShutdownWrite() error
```


➡️➡️ https://github.com/Allenxuxu/gev



gnet: 一个轻量级且高性能的 Go 网络库

开源程序panjf2000 发表了文章 • 1 个评论 • 517 次浏览 • 2019-09-18 16:08 • 来自相关话题


gnet












# 博客原文
https://taohuawu.club/go-event-loop-networking-library-gnet

# Github 主页
https://github.com/panjf2000/gnet

欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦。

# 简介

`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll) 和 [kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[libuv](https://github.com/libuv/libuv) 和 [libevent](https://github.com/libevent/libevent)。

这个项目存在的价值是提供一个在网络包处理方面能和 [Redis](http://redis.io)、[Haproxy](http://www.haproxy.org) 这两个项目具有相近性能的Go 语言网络服务器框架。

`gnet` 的亮点在于它是一个高性能、轻量级、非阻塞的纯 Go 实现的传输层(TCP/UDP/Unix-Socket)网络库,开发者可以使用 `gnet` 来实现自己的应用层网络协议,从而构建出自己的应用层网络应用:比如在 `gnet` 上实现 HTTP 协议就可以创建出一个 HTTP 服务器 或者 Web 开发框架,实现 Redis 协议就可以创建出自己的 Redis 服务器等等。

**`gnet` 衍生自另一个项目:`evio`,但是性能更好。**

# 功能

- [高性能](#性能测试) 的基于多线程模型的 Event-Loop 事件驱动
- 内置 Round-Robin 轮询负载均衡算法
- 简洁的 APIs
- 基于 Ring-Buffer 的高效内存利用
- 支持多种网络协议:TCP、UDP、Unix Sockets
- 支持两种事件驱动机制:Linux 里的 epoll 以及 FreeBSD 里的 kqueue
- 支持异步写操作
- 允许多个网络监听地址绑定在一个 Event-Loop 上
- 灵活的事件定时器
- SO_REUSEPORT 端口重用

# 核心设计

## 多线程模型

`gnet` 重新设计开发了一个新内置的多线程模型:『主从 Reactor 多线程』,这也是 `netty` 默认的线程模型,下面是这个模型的原理图:


multi_reactor



它的运行流程如下面的时序图:


reactor



现在我正在 `gnet` 里开发一个新的多线程模型:『带线程/go程池的主从 Reactors 多线程』,并且很快就能完成,这个模型的架构图如下所示:


multi_reactor_thread_pool



它的运行流程如下面的时序图:


multi-reactors



## 通信机制

`gnet` 的『主从 Reactors 多线程』模型是基于 Golang 里的 Goroutines的,一个 Reactor 挂载在一个 Goroutine 上,所以在 `gnet` 的这个网络模型里主 Reactor/Goroutine 与从 Reactors/Goroutines 有海量通信的需求,因此 `gnet` 里必须要有一个能在 Goroutines 之间进行高效率的通信的机制,我没有选择 Golang 里的主流方案:基于 Channel 的 CSP 模型,而是选择了性能更好、基于 Ring-Buffer 的 Disruptor 方案。

所以我最终选择了 [go-disruptor](https://github.com/smartystreets-prototypes/go-disruptor):高性能消息分发队列 LMAX Disruptor 的 Golang 实现。

## 自动扩容的 Ring-Buffer

`gnet` 利用 Ring-Buffer 来缓存 TCP 流数据以及管理内存使用。







# 开始使用

## 安装

```sh
$ go get -u github.com/panjf2000/gnet
```

## 使用示例

```go
// ======================== Echo Server implemented with gnet ===========================

package main

import (
"flag"
"fmt"
"log"
"strings"

"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/ringbuffer"
)

func main() {
var port int
var loops int
var udp bool
var trace bool
var reuseport bool

flag.IntVar(&port, "port", 5000, "server port")
flag.BoolVar(&udp, "udp", false, "listen on udp")
flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)")
flag.BoolVar(&trace, "trace", false, "print packets to console")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()

var events gnet.Events
events.NumLoops = loops
events.OnInitComplete = func(srv gnet.Server) (action gnet.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
return
}
events.React = func(c gnet.Conn, inBuf *ringbuffer.RingBuffer) (out []byte, action gnet.Action) {
top, tail := inBuf.PreReadAll()
out = append(top, tail...)
inBuf.Reset()

if trace {
log.Printf("%s", strings.TrimSpace(string(top)+string(tail)))
}
return
}
scheme := "tcp"
if udp {
scheme = "udp"
}
log.Fatal(gnet.Serve(events, fmt.Sprintf("%s://:%d", scheme, port)))
}

```

## I/O 事件

`gnet` 目前支持的 I/O 事件如下:

- `OnInitComplete` 当 server 初始化完成之后调用。
- `OnOpened` 当连接被打开的时候调用。
- `OnClosed` 当连接被关闭的时候调用。
- `OnDetached` 当主动摘除连接的时候的调用。
- `React` 当 server 端接收到从 client 端发送来的数据的时候调用。(你的核心业务代码一般是写在这个方法里)
- `Tick` 服务器启动的时候会调用一次,之后就以给定的时间间隔定时调用一次,是一个定时器方法。
- `PreWrite` 预先写数据方法,在 server 端写数据回 client 端之前调用。

# 性能测试

## Linux (epoll)

### 系统参数

```powershell
Go Version: go1.12.9 linux/amd64
OS: Ubuntu 18.04
CPU: 8 Virtual CPUs
Memory: 16.0 GiB
```

### Echo Server

![echolinux.png](https://img.hacpai.com/file/2019/09/echolinux-fca6e6e5.png)


### HTTP Server

![httplinux.png](https://img.hacpai.com/file/2019/09/httplinux-663a0318.png)


## FreeBSD (kqueue)

### 系统参数

```powershell
Go Version: go version go1.12.9 darwin/amd64
OS: macOS Mojave 10.14.6
CPU: 4 CPUs
Memory: 8.0 GiB
```

### Echo Server

![echomac.png](https://img.hacpai.com/file/2019/09/echomac-7a29e0d1.png)


### HTTP Server

![httpmac.png](https://img.hacpai.com/file/2019/09/httpmac-cb6d26ea.png)


# 证书

`gnet` 的源码允许用户在遵循 MIT [开源证书](https://github.com/panjf2000/gnet/blob/master/LICENSE) 规则的前提下使用。

# 待做事项

> gnet 还在持续开发的过程中,所以这个仓库的代码和文档会一直持续更新,如果你对 gnet 感兴趣的话,欢迎给这个开源库贡献你的代码~~

使用Go语言每分钟处理1百万请求

技术讨论astaxie 发表了文章 • 5 个评论 • 5443 次浏览 • 2016-10-10 16:25 • 来自相关话题

# 使用Go语言每分钟处理1百万请求(译) --- 在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息 ...查看全部
# 使用Go语言每分钟处理1百万请求(译)

---

在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息安全公司和所有需要的设施来支持一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。

有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的。不要错怪我。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境。但是一段时间后,你会开始以ruby的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。

作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。

# 问题

当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中。接下来,map-reduce系统可以操作这些数据。

按照习惯,我们会调研服务层级架构,涉及的软件如下:

- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- and so on…

搭建了2个不同的集群,一个提供web前端,另外一个提供后端处理,这样我们可以横向扩展后端服务的数量。

但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统。我已经使用了Go语言大约2年时间,我们开发了几个系统,但是很少会达到这样的负载(amount of load)。

我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数。

```
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
```

# 本地Go routines方法

刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}
```

对于中小负载,这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上。我们完全低估了流量。

上面的方案在很多地方很不好。没有办法控制我们产生的go routine的数量。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了。

# 再次尝试

我们需要找一个不同的方式。自开始我们就讨论过, 我们需要保持请求处理程序的生命周期很短,并且进程在后台产生。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题)。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等。这个列表会继续保留,因为有很多的方案可以实现这些。

所以,第二次迭代,我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了。

```
var Queue chan Payload

func init() {
Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
```

接下来,我们再从队列中取job,然后处理它们。我们使用类似于下面的代码:

```
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
```

说实话,我不知道我们在想什么。这肯定是一个满是Red-Bulls的夜晚。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力。

我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长。

![此处输入图片的描述][2]

# 最好的解决方案

我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。

想法是,以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误。这样我们选择了创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
```

我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取。

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}
```

在web server初始化时,我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。

```
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
```

下面是dispatcher的实现代码:

```
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
```

注意到,我们提供了初始化并加入到池子的worker的最大数量。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境,所以我们常常会遵守[12-factor][3]方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值。这种方式,我们控制worker的数量和JobQueue的大小,所以我们可以很快的改变这些值,而不需要重新部署集群。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
```

# 直接结果

我们部署了之后,立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大。

![此处输入图片的描述][4]

Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次。

我们一旦部署了新的代码,服务器的数量从100台大幅 下降到大约20台。

![此处输入图片的描述][5]

我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率,我们就会产生新的实例。

![此处输入图片的描述][6]


# 总结

在我的书中,简单总是获胜。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发。

我们仅仅用了4台机器,这并不是什么新鲜事了。可能它们还不如我的MacBook能力强大,但是却处理了每分钟1百万的写入到S3的请求。

处理问题有正确的工具。当你的 Ruby on Rails 系统需要更强大的web handler时,可以考虑下ruby生态系统之外的技术,或许可以得到更简单但更强大的替代方案。

[文章原文][7]


[1]: http://www.malwarebytes.org/
[2]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-1.png
[3]: http://12factor.net/
[4]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-2.png
[5]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-3.png
[6]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-4.png
[7]: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

[gev] 一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

Go开源项目惜朝 发表了文章 • 0 个评论 • 312 次浏览 • 2019-09-25 16:45 • 来自相关话题

`gev` 是一个 ...查看全部

`gev` 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。

➡️➡️ https://github.com/Allenxuxu/gev

特点

- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持

网络模型

`gev` 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

性能测试

> 测试环境 Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

吞吐量测试

限制 GOMAXPROCS=1(单线程),1 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


其他测试

速度测试


和同类库的简单性能比较, 压测方式与 evio 项目相同。
- gnet
- eviop
- evio
- net (标准库)

限制 GOMAXPROCS=1,1 个 work 协程


限制 GOMAXPROCS=1,4 个 work 协程


限制 GOMAXPROCS=4,4 个 work 协程


安装 gev


```bash
go get -u github.com/Allenxuxu/gev
```

快速入门


```go
package main

import (
"log"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
log.Println("OnMessage")
first, end := buffer.PeekAll()
out = first
if len(end) > 0 {
out = append(out, end...)
}
buffer.RetrieveAll()
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}

func main() {
handler := new(example)

s, err := gev.NewServer(handler,
gev.Address(":1833"),
gev.NumLoops(2),
gev.ReusePort(true))
if err != nil {
panic(err)
}

s.Start()
}
```

Handler 是一个接口,我们的程序必须实现它。

```go
type Handler interface {
OnConnect(c *connection.Connection)
OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) []byte
OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error) {
```

在消息到来时,gev 会回调 OnMessage ,在这个函数中可以通过返回一个切片来发送数据给客户端。

```go
func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte)
```

Connection 还提供 Send 方法来发送数据。Send 并不会立刻发送数据,而是先添加到 event loop 的任务队列中,然后唤醒 event loop 去发送。

更详细的使用方式可以参考示例:[服务端定时推送]

```go
func (c *Connection) Send(buffer []byte) error
```

Connection ShutdownWrite 会关闭写端,从而断开连接。

更详细的使用方式可以参考示例:[限制最大连接数]

```go
func (c *Connection) ShutdownWrite() error
```


➡️➡️ https://github.com/Allenxuxu/gev



gnet: 一个轻量级且高性能的 Go 网络库

开源程序panjf2000 发表了文章 • 1 个评论 • 517 次浏览 • 2019-09-18 16:08 • 来自相关话题


gnet












# 博客原文
https://taohuawu.club/go-event-loop-networking-library-gnet

# Github 主页
https://github.com/panjf2000/gnet

欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦。

# 简介

`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll) 和 [kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[libuv](https://github.com/libuv/libuv) 和 [libevent](https://github.com/libevent/libevent)。

这个项目存在的价值是提供一个在网络包处理方面能和 [Redis](http://redis.io)、[Haproxy](http://www.haproxy.org) 这两个项目具有相近性能的Go 语言网络服务器框架。

`gnet` 的亮点在于它是一个高性能、轻量级、非阻塞的纯 Go 实现的传输层(TCP/UDP/Unix-Socket)网络库,开发者可以使用 `gnet` 来实现自己的应用层网络协议,从而构建出自己的应用层网络应用:比如在 `gnet` 上实现 HTTP 协议就可以创建出一个 HTTP 服务器 或者 Web 开发框架,实现 Redis 协议就可以创建出自己的 Redis 服务器等等。

**`gnet` 衍生自另一个项目:`evio`,但是性能更好。**

# 功能

- [高性能](#性能测试) 的基于多线程模型的 Event-Loop 事件驱动
- 内置 Round-Robin 轮询负载均衡算法
- 简洁的 APIs
- 基于 Ring-Buffer 的高效内存利用
- 支持多种网络协议:TCP、UDP、Unix Sockets
- 支持两种事件驱动机制:Linux 里的 epoll 以及 FreeBSD 里的 kqueue
- 支持异步写操作
- 允许多个网络监听地址绑定在一个 Event-Loop 上
- 灵活的事件定时器
- SO_REUSEPORT 端口重用

# 核心设计

## 多线程模型

`gnet` 重新设计开发了一个新内置的多线程模型:『主从 Reactor 多线程』,这也是 `netty` 默认的线程模型,下面是这个模型的原理图:


multi_reactor



它的运行流程如下面的时序图:


reactor



现在我正在 `gnet` 里开发一个新的多线程模型:『带线程/go程池的主从 Reactors 多线程』,并且很快就能完成,这个模型的架构图如下所示:


multi_reactor_thread_pool



它的运行流程如下面的时序图:


multi-reactors



## 通信机制

`gnet` 的『主从 Reactors 多线程』模型是基于 Golang 里的 Goroutines的,一个 Reactor 挂载在一个 Goroutine 上,所以在 `gnet` 的这个网络模型里主 Reactor/Goroutine 与从 Reactors/Goroutines 有海量通信的需求,因此 `gnet` 里必须要有一个能在 Goroutines 之间进行高效率的通信的机制,我没有选择 Golang 里的主流方案:基于 Channel 的 CSP 模型,而是选择了性能更好、基于 Ring-Buffer 的 Disruptor 方案。

所以我最终选择了 [go-disruptor](https://github.com/smartystreets-prototypes/go-disruptor):高性能消息分发队列 LMAX Disruptor 的 Golang 实现。

## 自动扩容的 Ring-Buffer

`gnet` 利用 Ring-Buffer 来缓存 TCP 流数据以及管理内存使用。







# 开始使用

## 安装

```sh
$ go get -u github.com/panjf2000/gnet
```

## 使用示例

```go
// ======================== Echo Server implemented with gnet ===========================

package main

import (
"flag"
"fmt"
"log"
"strings"

"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/ringbuffer"
)

func main() {
var port int
var loops int
var udp bool
var trace bool
var reuseport bool

flag.IntVar(&port, "port", 5000, "server port")
flag.BoolVar(&udp, "udp", false, "listen on udp")
flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)")
flag.BoolVar(&trace, "trace", false, "print packets to console")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()

var events gnet.Events
events.NumLoops = loops
events.OnInitComplete = func(srv gnet.Server) (action gnet.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
return
}
events.React = func(c gnet.Conn, inBuf *ringbuffer.RingBuffer) (out []byte, action gnet.Action) {
top, tail := inBuf.PreReadAll()
out = append(top, tail...)
inBuf.Reset()

if trace {
log.Printf("%s", strings.TrimSpace(string(top)+string(tail)))
}
return
}
scheme := "tcp"
if udp {
scheme = "udp"
}
log.Fatal(gnet.Serve(events, fmt.Sprintf("%s://:%d", scheme, port)))
}

```

## I/O 事件

`gnet` 目前支持的 I/O 事件如下:

- `OnInitComplete` 当 server 初始化完成之后调用。
- `OnOpened` 当连接被打开的时候调用。
- `OnClosed` 当连接被关闭的时候调用。
- `OnDetached` 当主动摘除连接的时候的调用。
- `React` 当 server 端接收到从 client 端发送来的数据的时候调用。(你的核心业务代码一般是写在这个方法里)
- `Tick` 服务器启动的时候会调用一次,之后就以给定的时间间隔定时调用一次,是一个定时器方法。
- `PreWrite` 预先写数据方法,在 server 端写数据回 client 端之前调用。

# 性能测试

## Linux (epoll)

### 系统参数

```powershell
Go Version: go1.12.9 linux/amd64
OS: Ubuntu 18.04
CPU: 8 Virtual CPUs
Memory: 16.0 GiB
```

### Echo Server

![echolinux.png](https://img.hacpai.com/file/2019/09/echolinux-fca6e6e5.png)


### HTTP Server

![httplinux.png](https://img.hacpai.com/file/2019/09/httplinux-663a0318.png)


## FreeBSD (kqueue)

### 系统参数

```powershell
Go Version: go version go1.12.9 darwin/amd64
OS: macOS Mojave 10.14.6
CPU: 4 CPUs
Memory: 8.0 GiB
```

### Echo Server

![echomac.png](https://img.hacpai.com/file/2019/09/echomac-7a29e0d1.png)


### HTTP Server

![httpmac.png](https://img.hacpai.com/file/2019/09/httpmac-cb6d26ea.png)


# 证书

`gnet` 的源码允许用户在遵循 MIT [开源证书](https://github.com/panjf2000/gnet/blob/master/LICENSE) 规则的前提下使用。

# 待做事项

> gnet 还在持续开发的过程中,所以这个仓库的代码和文档会一直持续更新,如果你对 gnet 感兴趣的话,欢迎给这个开源库贡献你的代码~~

使用Go语言每分钟处理1百万请求

技术讨论astaxie 发表了文章 • 5 个评论 • 5443 次浏览 • 2016-10-10 16:25 • 来自相关话题

# 使用Go语言每分钟处理1百万请求(译) --- 在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息 ...查看全部
# 使用Go语言每分钟处理1百万请求(译)

---

在[Malwarebytes ][1]我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息安全公司和所有需要的设施来支持一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。

有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的。不要错怪我。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境。但是一段时间后,你会开始以ruby的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。

作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。

# 问题

当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中。接下来,map-reduce系统可以操作这些数据。

按照习惯,我们会调研服务层级架构,涉及的软件如下:

- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- and so on…

搭建了2个不同的集群,一个提供web前端,另外一个提供后端处理,这样我们可以横向扩展后端服务的数量。

但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统。我已经使用了Go语言大约2年时间,我们开发了几个系统,但是很少会达到这样的负载(amount of load)。

我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数。

```
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
```

# 本地Go routines方法

刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}
```

对于中小负载,这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上。我们完全低估了流量。

上面的方案在很多地方很不好。没有办法控制我们产生的go routine的数量。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了。

# 再次尝试

我们需要找一个不同的方式。自开始我们就讨论过, 我们需要保持请求处理程序的生命周期很短,并且进程在后台产生。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题)。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等。这个列表会继续保留,因为有很多的方案可以实现这些。

所以,第二次迭代,我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了。

```
var Queue chan Payload

func init() {
Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
```

接下来,我们再从队列中取job,然后处理它们。我们使用类似于下面的代码:

```
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
```

说实话,我不知道我们在想什么。这肯定是一个满是Red-Bulls的夜晚。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力。

我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长。

![此处输入图片的描述][2]

# 最好的解决方案

我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。

想法是,以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误。这样我们选择了创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
```

我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取。

```
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}
```

在web server初始化时,我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。

```
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
```

下面是dispatcher的实现代码:

```
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
```

注意到,我们提供了初始化并加入到池子的worker的最大数量。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境,所以我们常常会遵守[12-factor][3]方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值。这种方式,我们控制worker的数量和JobQueue的大小,所以我们可以很快的改变这些值,而不需要重新部署集群。

```
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
```

# 直接结果

我们部署了之后,立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大。

![此处输入图片的描述][4]

Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次。

我们一旦部署了新的代码,服务器的数量从100台大幅 下降到大约20台。

![此处输入图片的描述][5]

我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率,我们就会产生新的实例。

![此处输入图片的描述][6]


# 总结

在我的书中,简单总是获胜。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发。

我们仅仅用了4台机器,这并不是什么新鲜事了。可能它们还不如我的MacBook能力强大,但是却处理了每分钟1百万的写入到S3的请求。

处理问题有正确的工具。当你的 Ruby on Rails 系统需要更强大的web handler时,可以考虑下ruby生态系统之外的技术,或许可以得到更简单但更强大的替代方案。

[文章原文][7]


[1]: http://www.malwarebytes.org/
[2]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-1.png
[3]: http://12factor.net/
[4]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-2.png
[5]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-3.png
[6]: https://raw.githubusercontent.com/itfanr/articles-about-golang/master/2016-10/1-4.png
[7]: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/