原创分享 NBIO 第二弹 —— 支持 Non-Blocking HTTP 1.x

lesismal · 2021年03月15日 · 最后由 lesismal 回复于 2021年03月17日 · 336 次阅读
本帖已被设为精华帖!

一、简介

最近两周撸了份 HTTP 1.x 的 Parser ,用于支持异步网络库的数据解析(同步网络库当然也可以使用),在此基础之上实现了 NBIO HTTP Server ,其他异步网络库也可以使用这个 Parser 进行 HTTP Server 的封装,但需依赖其他网络库实现 net.Conn。

众所周知,标准库的 HTTP 为每个连接创建一个协程,在高并发场景下比如 10k、100k 甚至 1000k,需要创建大量的协程,消耗大量的内存、协程调度等成本。但是使用异步网络库,可以不用为每个连接都创建单独的协程,从而降低相应的消耗、极大提高同等硬件的负载能力。

NBIO HTTP Server 兼容标准库的 http.Handler ,所以已有的基于标准库的 web 框架也可以很容易地使用 NBIO HTTP Server 作为异步网络层来替换标准库。 如果需要对 fasthttp 这类不使用标准库的 web 框架进行支持,也只需参考默认兼容标准库的 Processor,实现一份对应 fasthttp Hadler 的 Processor 即可。但由于 fasthttp 默认使用 [] byte 作为原始数据字段的存储,而 Parser 兼顾应用层便利在参数传递中直接转换成了 string ,所以需要浪费一点不必要的 string/[] byte 转换,也可以考虑是否需要把参数传递改成 [] byte,但改成 [] byte 看上去就不那么友好、美观了。

NBIO HTTP Server 网络层接口在 *nix 系统上是异步的,处理流程是:

  1. NBIO 作为网络层处理数据 IO 。
  2. 读取到的数据回调应用层方法执行 Parser 进行解析,这里给应用层留了参数,应用层可以自己定制执行的回调函数,比如可以就在 NBIO 读取数据的协程中进行解析,也可以自己定制协程池进行解析(但要注意,同一个连接的数据应该指定到同一个协程中进行解析,否则由于 TCP 的 Stream 特性,可能导致 "粘包" 相关的数据错乱)。为了使用者便利,如果应用层传入 nil 参数,NBIO HTTP Server 则提供默认的协程池进行解析。
  3. Parser 解析到一个完整消息后调用业务层回调进行处理,这里与 Parser 类似,可由应用层传入处理函数,如果传入 nil 参数,则由默认的协程池进行处理,这里的协程池与 Parser 的协程池不同,因为已经是完整的消息,可以由协程池内空闲协程而非指定协程抢任务执行,以避免单个连接某个方法处理中可能存在 DB 等慢操作导致其他连接的消息处理被阻塞。
  • 关于 3 中协程池,NBIO HTTP Server 支持乱序处理、顺序回包。如果请求方的客户端实现支持单个连接的多个消息非线头阻塞发送、而不用等待每个消息收到回复才发出下个请求的数据,则该连接的多个请求有可能在 NBIO HTTP Server 默认协程池中乱序执行,比如 request 1 需要 1 秒进行处理,request 2 也到达并且只需要 10ms 进行处理,则 request 2 先被处理完,但是 request 2 回复的数据会被缓存,仍然等 request 1 处理完成后先回复 request 1、再回复 request 2,不会导致客户端收到的响应乱序。

二、两点澄清

  1. 以前有小伙伴提出,golang 底层也是异步、我这种重复再造轮子也是异步、没有意义——这种说法是不正确的:golang 底层也是异步,但是语言层面或者标准库 net 的接口层是同步的,所以才需要每个连接一个协程,而 NBIO 接口层也是异步的,所以可以自行定制管理、避免不必要的协程创建,两者的异步是不一样的。
  2. 还有的小伙伴提出,golang 的同步模式是巨大的进步,我这个库又回到异步模式,是倒退——这种说法也是不准确的:底层基础设施的异步,并不代表应用层也一定要异步,golang 的协程和 chan 足够方便,应用层完全可以自己定制多种编程模式。NBIO HTTP Server 在上面简介流程 3 中的消息处理,应用层的 http.Handler 内,和使用标准库的方式是没有变化的,业务层仍然是按照同步的方式进行顺序逻辑的处理。

三、示例代码

NBIO HTTP Server 的示例请参考这里:https://github.com/lesismal/nbio/tree/master/examples/http

这里也包括了一份百万连接的测试样例:百万连接测试代码 ,由于网络协议栈的 PORT 使用 short 类型导致的 65535 限制,为了免去单机压测部署环境的麻烦,百万连接测试的示例代码开启监听了多组端口,因为这些端口接受连接和处理 IO 都是共用相同的一组 poller ,单一端口也是使用这组 poller,所以多端口跟单一端口的性能是基本一致的,有兴趣的小伙伴也可以改成单一端口、自行搭建虚拟网络或者多组 docker、真实多机环境、压测客户端之类的进行压测 PS:NBIO 主要针对 *nix 系统,在 windows 下为了方便用户调试,使用标准库的 net 实现了接口兼容,windows 下的压测数据不用来作为性能对比的参考,压测请于 linux 环境下进行。

四、路线图

  1. Websocket
  2. HTTP2.0
  3. 前阵子有魔改了一份标准库的 TLS 支持异步并与 NBIO 打通,但是标准库的 TLS 原来是同步模式的代码、魔改成支持异步的很多细节我没有优化、显得臃肿浪费,希望以后有档期完全重写一份更清爽的
  • 每一项都是体力活,感觉路漫漫,也希望有兴趣的大佬、小伙伴多来交流、PR

五、以 gin 为例,分别使用 STD、NBIO 进行压测对比

  • 压测环境: 4c8t / 8g 虚拟机,C/S localhost

1. gin 默认使用标准库压测

1)gin std server 代码

package main

import (
        "fmt"
        "net/http"
        "runtime"
        "sync/atomic"
        "time"

        "github.com/gin-gonic/gin"
)

func main() {
        var (
                qps   uint64 = 0
                total uint64 = 0
        )

        router := gin.New()
        router.GET("/hello", func(c *gin.Context) {
                atomic.AddUint64(&qps, 1)
                c.String(http.StatusOK, "hello")
        })

        go router.Run()

        ticker := time.NewTicker(time.Second)
        for i := 1; true; i++ {
                <-ticker.C
                n := atomic.SwapUint64(&qps, 0)
                total += n
                fmt.Printf("running for %v seconds, NumGoroutine: %v, qps: %v, total: %v\n", i, runtime.NumGoroutine(), n, total)
        }
}

2)wrk 压测 20k 连接数

wrk -t4 -c20000 -d30s --latency http://localhost:8080/hello

3)压测结果日志

所有连接建立成功直到 qps 稳定的 server 日志:

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /hello                    --> main.main.func1 (1 handlers)
[GIN-debug] Environment variable PORT is undefined. Using port :8080 by default
[GIN-debug] Listening and serving HTTP on :8080
running for 1 seconds, NumGoroutine: 2, qps: 0, total: 0
running for 2 seconds, NumGoroutine: 2, qps: 0, total: 0
running for 3 seconds, NumGoroutine: 5277, qps: 0, total: 0
running for 4 seconds, NumGoroutine: 9411, qps: 0, total: 0
running for 5 seconds, NumGoroutine: 11404, qps: 0, total: 0
running for 6 seconds, NumGoroutine: 15696, qps: 95115, total: 95115
running for 7 seconds, NumGoroutine: 16653, qps: 74368, total: 169483
running for 8 seconds, NumGoroutine: 19188, qps: 72357, total: 241840
running for 9 seconds, NumGoroutine: 19942, qps: 68762, total: 310602
running for 10 seconds, NumGoroutine: 19936, qps: 86198, total: 396800
running for 11 seconds, NumGoroutine: 20008, qps: 114406, total: 511206
running for 12 seconds, NumGoroutine: 20015, qps: 137557, total: 648763
running for 13 seconds, NumGoroutine: 20003, qps: 135883, total: 784646
running for 14 seconds, NumGoroutine: 20009, qps: 130973, total: 915619
running for 15 seconds, NumGoroutine: 20011, qps: 130860, total: 1046479

wrk 测试结果日志:

Running 30s test @ http://localhost:8080/hello
  4 threads and 20000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   145.59ms   79.06ms   1.36s    88.79%
    Req/Sec    32.62k    10.49k   73.27k    79.31%
  Latency Distribution
     50%  131.01ms
     75%  151.73ms
     90%  186.63ms
     99%  542.54ms
  3391563 requests in 30.09s, 391.37MB read
Requests/sec: 112705.44
Transfer/sec:     13.01MB

2. 使用 NBIO HTTP Server 作为 gin 的网络层压测

1)gin nbio server 代码

package main

import (
    "fmt"
    "net/http"
    "runtime"
    "sync/atomic"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/lesismal/nbio/nbhttp"
)

func main() {
    var (
        qps   uint64 = 0
        total uint64 = 0
    )

    router := gin.New()
    router.GET("/hello", func(c *gin.Context) {
        atomic.AddUint64(&qps, 1)
        c.String(http.StatusOK, "hello")
    })

    svr := nbhttp.NewServer(nbhttp.Config{
        Network:      "tcp",
        Addrs:        []string{"localhost:8080"},
        NPoller:      8, // runtime.NumCPU(),
        NParser:      8, // runtime.NumCPU(),
        TaskPoolSize: 100, // runtime.NumCPU() * 10, // goroutines pool to execute http.Handler
    }, router, nil, nil)

    err := svr.Start()
    if err != nil {
        fmt.Printf("nbio.Start failed: %v\n", err)
        return
    }
    defer svr.Stop()

    ticker := time.NewTicker(time.Second)
    for i := 1; true; i++ {
        <-ticker.C
        n := atomic.SwapUint64(&qps, 0)
        total += n
        fmt.Printf("running for %v seconds, online: %v, NumGoroutine: %v, qps: %v, total: %v\n", i, svr.State().Online, runtime.NumGoroutine(), n, total)
    }
}

2)wrk 压测 20k 连接数

wrk -t4 -c20000 -d30s --latency http://localhost:8080/hello

3)压测结果

所有连接建立成功直到 qps 稳定的 server 日志:

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /hello                    --> main.main.func1 (1 handlers)
2021/03/13 14:06:03.797 [INF] Gopher[NB] start listen on: ["localhost:8080"]
running for 1 seconds, online: 0, NumGoroutine: 19, qps: 0, total: 0
running for 2 seconds, online: 0, NumGoroutine: 19, qps: 0, total: 0
running for 3 seconds, online: 0, NumGoroutine: 19, qps: 0, total: 0
running for 4 seconds, online: 4068, NumGoroutine: 19, qps: 0, total: 0
running for 5 seconds, online: 9061, NumGoroutine: 19, qps: 0, total: 0
running for 6 seconds, online: 12567, NumGoroutine: 119, qps: 3598, total: 3598
running for 7 seconds, online: 18018, NumGoroutine: 119, qps: 126743, total: 130341
running for 8 seconds, online: 19916, NumGoroutine: 119, qps: 153748, total: 284089
running for 9 seconds, online: 19916, NumGoroutine: 119, qps: 152665, total: 436754
running for 10 seconds, online: 19916, NumGoroutine: 119, qps: 156468, total: 593222
running for 11 seconds, online: 20000, NumGoroutine: 119, qps: 146699, total: 739921
running for 12 seconds, online: 20000, NumGoroutine: 119, qps: 145776, total: 885697
running for 13 seconds, online: 20000, NumGoroutine: 119, qps: 155327, total: 1041024
running for 14 seconds, online: 20000, NumGoroutine: 119, qps: 148740, total: 1189764
running for 15 seconds, online: 20000, NumGoroutine: 119, qps: 143539, total: 1333303

wrk 测试结果日志:

Running 30s test @ http://localhost:8080/hello
  4 threads and 20000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   129.22ms   26.45ms 609.89ms   74.38%
    Req/Sec    38.08k     3.69k   57.58k    72.97%
  Latency Distribution
     50%  128.42ms
     75%  144.86ms
     90%  160.37ms
     99%  191.20ms
  4146017 requests in 30.06s, 478.43MB read

数据对比

指标 GIN+STD GIN+NBIO
压测连接数 20000 20000
峰值进程协程数量 20000+ 119
峰值内存占用 600+M 60+M
峰值 CPU 占用 500-600% 400-500%
wrk Latency Avg 145.59ms 129.22ms
wrk Latency Stdev 79.06ms 26.45ms
wrk Latency Max 1.36s 609.89ms
wrk Latency 50% 131.01ms 128.42ms
wrk Latency 75% 151.73ms 144.86ms
wrk Latency 90% 186.63ms 160.37ms
wrk Latency 99% 542.54ms 191.20ms
wrk Req/Sec Avg 32.62k 38.08k
wrk Req/Sec Stdev 10.49k 3.69k
wrk Req/Sec Max 73.27k 57.58k

GIN+NBIO 方式整体压测指标好于 GIN+STD,相比之下,极低的内存占用尤为明显,NBIO 可以使同配置或者低配硬件的负载能力大幅提升。

多数小伙伴们的业务可能不需要极致的资源控制、通常加机器就行,但面对海量并发场景、大规模集群时,异步网络框架可以极大降低相应的硬件成本。

现在的云、大数据、人工智能、物联网、5G 时代已经蓬勃发展,但这一切只是开始,IT 爆炸的时代,很多传统领域都在 IT 化,未来的数据量、计算量、网络传输量更会越来越迅猛地增长,海量计算的基础之上,一点算力的节约会在放大效应下变得非常明显。

以物联网为例,海量接入设备、海量并发连接数之下,golang 标准库的每个连接一个协程的默认同步模式可能会成为性能瓶颈,需要更多的硬件开销、能源消耗。超高并发场景下,以 golang 标准库方案的性能、资源消耗、负载能力,目前赶不上 java netty、nodejs,更不用说 c/c++/rust,所以个人认为 golang 的异步基础设施很有必要,还有很大发展空间。

欢迎有兴趣的小伙伴关注、进行更多测试,以及 issue、pr、star,^_^

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
astaxie 将本帖设为了精华贴 03月15日 06:54

好东西

astaxie 回复

上周也想写个 beego 的例子来着,但是没太研究明白 beego router + 外部 serve 需要怎么实现,谢老板多多指点,如果可以内嵌个支持更好

路还很长,一定要让 golang 更强 😋

没找到 http 长连接实现啊 ,http 标准有 9 种少个 patch。

eudore 回复

这两个地方有 Close 和 keep-alive 相关的处理: https://github.com/lesismal/nbio/blob/master/nbhttp/processor.go#L204 https://github.com/lesismal/nbio/blob/master/nbhttp/processor.go#L274

我这里处理相对简单,没有按照标准,因为 http1.x 标准本身就不是什么好标准,如果 client 端不是 close,server 端按照配置的统一时间设置读超时。server 端响应给 client 端的,留给 web 框架自己实现

eudore 回复

Cache-Control 相关的也没做支持,因为主要定位是接口服务、排除静态资源类,静态资源类的服务用其他的比如 nginx 可以 sendfile,zero copy,性能更好

lesismal 回复

我之前在实现的时候是对比标准库的 server,标准库 response header 默认应该也是没有自带 keep-alive 之类的响应头,如果需要,留给应用层框架自行实现。nbio 定位不是像 gin 之类的 web 框架,而是定位在协议层本身、负责数据的基础传输和解析,对标的是标准库的同步阻塞模式

mahuaibo GoCN 每日新闻 (2021-03-16) 中提及了此贴 03月16日 07:36

@lesismal 如果我使用 nbhttp 代替 nethttp,我希望可以无缝替换,我理解的 nbhttp.Server 定位相当于 http.Server,长连接、分段传输、expect、h2 握手、upgrade 都算 http.Server 里面的特性吧。 Cache-Control 标准库实现在 fs.go 里面没有放在 Server,提到 nginx 一般 nginx 反向代理用的 http/1.0 没有长连接影响也不大,不知道有没有能直接使用的长连接,两处 keep-alive 处理代码我没仔细看也不太信任可以直接使用,因为长连接实现依赖传输机制。

我调试了 example https://github.com/lesismal/nbio/blob/master/examples/http/server/server.go

func onEcho(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte(time.Now().Format("20060102 15:04:05")))
        w.Write([]byte(time.Now().Format("20060102 15:04:05")))
        w.Write([]byte(time.Now().Format("20060102 15:04:05")))
        w.Write([]byte(time.Now().Format("20060102 15:04:05")))
        for i:=0;i<500000;i++{
                w.Write([]byte(time.Now().Format("20060102 15:04:05")))
        }
    atomic.AddUint64(&qps, 1)
}

响应:

[root@izuf6b7o9hu1q8vzyvkyznz gateway]# curl  -I 127.0.0.1:28000/echo
HTTP/1.1 200 OK
Content-Length: 8500068
Content-Type: text/plain; charset=utf-8
Date: Wed, 17 Mar 2021 01:23:42 GMT

850KB 的数据都放在内存种,没有和 http.Server 一样流式写入,我感觉可能没有对传输机制做处理,那么我返回一个我猜测返回一个文件内容会不会把所有读到内存中呢? 如果预先设置了 Content-Length 那么不会内存保存的话除外,懒了详细看源码。

eudore 回复

咱们按每个点来说:

长连接

我没太理解你说的长连接具体是指什么问题?是指 Connection 头对应的处理方式、还是指 4 层 tcp 的什么问题?

如果只是认为一个请求之后 NBHTTP Server 就会主动断开连接,如果请求方不是 http 1.0,也没有发送 Connection: close 头,是会保持连接的,您可以 netstat 看下

分段传输

实现时我考虑过几点:

  1. 优先使用 Content-Length 发送 —— 已经实现
  2. 有 Transfer-Encoding: chnaked 或者 Trailer Header 时必须 chunked,这是协议标准 —— 已实现
  3. WriteHeader 则直接写,Write 则判断是否需要合并 header 和 body —— 未实现
  4. body 过大多次发送 —— 未实现

最后实现了 1、2,应该算符合标准: 在没有 Transfer-Encoding: chnaked 或者 Trailer Header 时,server 回包用 Content-Length 的方式应该是并不影响 client 端解码,因为不管哪种方式回包,client 都需要解码出完整 body,所以按照 Content-Length 回包应该是没问题,并且 Content-Length 对于 C/S 两端编解码性能都更友好 如果有特例、即使没有 Transfer-Encoding: chnaked 或者 Trailer Header 也必须按照 chunked 的方式回包,请提供一个我研究下再进行实现

为什么没有实现 3: 因为支持 1.1 的 pipeline,客户端的多个请求可能同时到达服务端,并且异步网络层跟标准库不一样,标准库是同步挨个读取、处理完一个才会读取下一个,但是异步库,为了更高的性能,我支持了业务层可定制的协程池,在帖子中乱序处理的部分有解释。应用层当然也可以定制成单个连接指定到特定的协程去处理,从而跟标准库类似,也是顺序处理,但是这在遇到数据库等慢 IO 操作时可能导致该任务协程的阻塞,从而影响其他连接上的请求的处理,只有作为网关、代理之类的不涉及数据库等慢操作的基础设施类服务才适合这种定制,所以作为通用框架没有这样实现。所以比如主帖中介绍的 request 2 的处理中,框架层不能直接就 WriteHeader 到 Conn,而是需要等到真正发送自己这个 response 时再打包发送。也可以实现成判断自己这个 response 是不是就对应最新的那个请求再进行发送,但是这并不能保证所有 response 都达到立即发送的效果,并且,性能不友好,所以没必要

为什么没有实现 4: nbio 的配置项有 MaxWriteBufferSize,可以用来控制 Conn 最大应用层的发送队列缓冲,即 tcp 发送缓冲满了时,应用层也堆积数据超过这个 size 时 server 就会主动断开连接,因为该连接已经拥堵不堪,等待其恢复不如壮士断腕了。并且,网络层上讲,Writev 通常性能好于 Write 多端数据,内存拷贝拼接的成本小于多次 syscall 的成本,所以在 MaxWriteBufferSize 可控的基础之上,nbio 进行这些细节的优化来尽量提高通用场景的整体性能,并且如上面实现 1、2 的解释中提到的,允许 Content-Length 回包时性能优先

返回一个文件内容会不会把所有读到内存中呢

如果是静态资源服务器,我还是建议其他支持 sendfile 的内核级 zero copy 的,相当于前后端分离,go 只做接口服务。 当然一起做也可以,如果是返回完整文件,应该是需要整个文件内容读取到内存的,并且,如果无法利用 zero copy 并且又想高性能,应该在服务器启动时就 load 一次而不是每次去 open read write close 之类的。包括 go 新增的 embedded 也都是要加载到内存中,如果内存资源不够用,那真的是需要考虑加硬件了

h2、握手、upgrade

如主帖中介绍,websocket 和 http 2.0 在计划中,socket.io 之类的就不考虑支持了,这些每一项都是个体力活、需要时间,并且这个项目就我一个人在为爱发电,最近就是肝得有点狠了身体状态有点下滑得调整下,所以也希望更多大佬、小伙伴们一起来玩

eudore 回复

并且 chunked 的引入,主要是因为服务端在回包时,有些数值没法在发送时就计算出来,比如校验包体相关的 Trailer Header 这些,所以需要先发送 Trailer 相关 header 字段的 key 放到 body 前的 header 列表里标识有哪些 header 字段,然后在发送完 body 后计算出这些 header 对应的值,再把这些 header 的 key-value 放到 body 后面,收到包的再按这个规则解析出来,这种无法预知 header 值的业务场景下只能通过 chunked 的方式,但除了这种,Content-Length 的编解码和传输数据量都更性能友好

标准库的 server 需要回 Trailer Header 好像只能 Hijack,比较麻烦,NBHTTP Server 为了用户便利还支持比较方便的 Trailer 方式(以"Trailer-"为前缀设置 header),比如:

func onEcho(w http.ResponseWriter, r *http.Request) {
    w.Header().Add("Trailer-YourTrailerHeaderKey", "YourTrailerHeaderValue")
    // ...
}
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册