【Go】使用压缩文件优化io (一)

原文连接:https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html 最近遇到一个日志备份 io 过高的问题,业务日志每十分钟备份一次,本来是用 Python 写一个根据规则扫描备份日志问题不大,但是随着业务越来越多,单机上的日志文件越来越大,文件数量也越来越多,导致每每备份的瞬间 io 阻塞严重, CPU 和 load 异常的高,好在备份速度很快,对业务影响不是很大,这个问题会随着业务增长,越来越明显,这段时间抽空对备份方式做了优化,效果十分显著,整理篇文章记录一下。 ## 背景说明 > 服务器配置:4 核 8G; 磁盘:500G > > 每十分钟需要上传:18 个文件,高峰时期约 10 G 左右 业务日志为了保证可靠性,会先写入磁盘文件,每10分钟切分日志文件,然后在下十分钟第一分时备份日志到 OSS,数据分析服务会从在备份完成后拉取日志进行分析,日志备份需要高效快速,在最短的时间内备份完,一般备份均能在几十秒内完成。 备份的速度和效率并不是问题,足够的快,但是在备份时 io 阻塞严重导致的 CPU 和 load 异常,成为业务服务的瓶颈,在高峰期业务服务仅消耗一半的系统资源,但是备份时 CPU 经常 100%,且 iowait 可以达到 70 多,空闲资源非常少,这样随着业务扩展,日志备份虽然时间很短,却成为了系统的瓶颈。 后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为: - 文件数:336 - 原始文件:96G - 压缩文件:24G - 压缩方案:lzo - Goroutine 数量:4 ## 优化前 优化前日志备份流程: - 根据备份规则扫描需要备份的文件 - 使用 `lzop` 命令压缩日志 - 上传压缩后的日志到 OSS 下面是代码实现,这里不再包含备份文件规则,仅演示压缩上传逻辑部分,程序接受文件列表,并对文件列表压缩上传至 OSS 中。 `.../pkg/aliyun_oss` 是我自己封装的基于阿里云 OSS 操作的包,这个路径是错误的,仅做演示,想运行下面的代码,OSS 交互这部分需要自己实现。 ```go package main import ( "bytes" "fmt" "os" "os/exec" "path/filepath" "sync" "time" ".../pkg/aliyun_oss" ) func main() { var oss *aliyun_oss.AliyunOSS files := os.Args[1:] if len(files) < 1 { fmt.Println("请输入要上传的文件") os.Exit(1) } fmt.Printf("待备份文件数量:%d\n", len(files)) startTime := time.Now() defer func(startTime time.Time) { fmt.Printf("共耗时:%s\n", time.Now().Sub(startTime).String()) }(startTime) var wg sync.WaitGroup n := 4 c := make(chan string) // 压缩日志 wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for file := range c { cmd := exec.Command("lzop", file) cmd.Stderr = &bytes.Buffer{} err := cmd.Run() if err != nil { panic(cmd.Stderr.(*bytes.Buffer).String()) } } }() } for _, file := range files { c <- file } close(c) wg.Wait() fmt.Printf("压缩耗时:%s\n", time.Now().Sub(startTime).String()) // 上传压缩日志 startTime = time.Now() c = make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for file := range c { name := filepath.Base(file) err := oss.PutObjectFromFile("tmp/"+name+".lzo", file+".lzo") if err != nil { panic(err) } } }() } for _, file := range files { c <- file } close(c) wg.Wait() fmt.Printf("上传耗时:%s\n", time.Now().Sub(startTime).String()) } ``` 程序运行时输出: ```shell 待备份文件数量:336 压缩耗时:19m44.125314226s 上传耗时:6m14.929371103s 共耗时:25m59.118002969s ``` 从运行结果中可以看出压缩文件耗时很久,实际通过 `iostat` 命令分析也发现,压缩时资源消耗比较高,下面是 `iostat -m -x 5 10000` 命令采集各个阶段数据。 - 程序运行前 ```shell avg-cpu: %user %nice %system %iowait %steal %idle 2.35 0.00 2.86 0.00 0.00 94.79 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 vdb 0.00 0.60 0.00 0.60 0.00 4.80 16.00 0.00 0.67 0.00 0.67 0.67 0.04 ``` - 压缩日志时 ```shell avg-cpu: %user %nice %system %iowait %steal %idle 10.84 0.00 6.85 80.88 0.00 1.43 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 0.60 0.00 2.40 0.00 8.00 0.00 0.67 0.67 0.00 0.67 0.04 vdb 14.80 5113.80 1087.60 60.60 78123.20 20697.60 172.13 123.17 106.45 106.26 109.87 0.87 100.00 avg-cpu: %user %nice %system %iowait %steal %idle 10.06 0.00 7.19 79.06 0.00 3.70 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 1.60 0.00 103.20 0.00 129.00 0.01 3.62 3.62 0.00 0.50 0.08 vdb 14.20 4981.20 992.80 52.60 79682.40 20135.20 190.97 120.34 112.19 110.60 142.17 0.96 100.00 ``` - 上传日志时 ```shell avg-cpu: %user %nice %system %iowait %steal %idle 6.98 0.00 7.81 7.71 0.00 77.50 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 13.40 0.00 242.40 0.00 36.18 0.02 1.63 1.63 0.00 0.19 0.26 vdb 0.40 2.40 269.60 1.20 67184.80 14.40 496.30 4.58 15.70 15.77 0.33 1.39 37.74 avg-cpu: %user %nice %system %iowait %steal %idle 7.06 0.00 8.00 4.57 0.00 80.37 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 0.60 0.00 75.20 0.00 250.67 0.00 2.67 2.67 0.00 2.00 0.12 vdb 0.20 0.00 344.80 0.00 65398.40 0.00 379.34 5.66 16.42 16.42 0.00 1.27 43.66 ``` 从 `iostat` 的结果中发现,压缩时程序 `r_await` 和 `w_await` 都到了一百多,且 `iowait` 高达 `80.88%`,几乎耗尽了所有的 CPU,上传时 `iowait` 是可以接受的,因为只是单纯的读取压缩文件,且压缩文件也很小。 ### 分析问题 上述结果中发现程序主要运行消耗在压缩日志,那优化也着重日志压缩的逻辑上。 压缩时日志会先压缩成 `lzo` 文件,然后再上传 `lzo` 文件到阿里云 OSS 上,这中间发生了几个过程: - 读取原始日志文件 - 压缩数据 - 写入 `lzo` 文件 - 读取 `lzo` 文件 - `http` 发送读取的内容 压缩时 `r_await` 和 `w_await` 都很高,主要发生在读取原始日志文件,写入 `lzo` 文件, 怎么优化呢? 先想一下原始需求,读取原始文件 -> 上传数据。但是直接上传原始文件,文件比较大,网络传输慢,而且存储费用也比较高,怎么办呢? 这个时候我们期望可以上传的是压缩文件,所以就有了优化前的逻辑,这里面产生了一个中间过程,即使用 `lzop` 命令压缩文件,而且产生了一个中间文件 `lzo` 文件。 读取原始文件和上传数据是必须的,那么可以优化的就是压缩的流程了,所以 `r_await` 是没有办法优化的,那么只能优化 `w_await`,`w_await` 是怎么产生的呢,恰恰是写入`lzo` 时产生的,可以不要 `lzo` 文件吗?这个文件有什么作用? 如果我们压缩文件数据流,在 读取原始文件 -> 上传数据 流程中对上传的数据流进行实时压缩,把压缩的内容给上传了,实现边读边压缩,对数据流进行处理,像是一个中间件,这样就不用写 `lzo` 文件了,那么 `w_await` 就被完全优化没了。 `lzo` 文件有什么作用?我想只有在上传失败之后可以节省一次文件压缩的消耗。上传失败的次数多吗?我用阿里云 OSS 好几年了,除了一次内网故障,再也没有遇到过上传失败的文件,我想是不需要这个文件的,而且生成 `lzo` 文件还需要占用磁盘空间,定时清理等等,增加了资源消耗和维护成本。 ### 优化后 根据之前的分析看一下优化之后备份文件需要哪些过程: - 读取原始日志 - 在内存中压缩数据流 - http 发送压缩后的内容 这个流程节省了两个步骤,写入 `lzo` 文件和 读取 `lzo` 文件,不仅没有 `w_await`,就连 `r_await` 也得到了小幅度的优化。 优化方案确定了,可是怎么实现 `lzo` 对文件流进行压缩呢,去 `Github` 上找一下看看有没有 `lzo` 的压缩算法库,发现 `github.com/cyberdelia/lzo` ,虽然是引用 C 库实现的,但是经典的两个算法(`lzo1x_1` 和 `lzo1x_999`)都提供了接口,貌似 Go 可以直接用了也就这一个库了。 发现这个库实现了 `io.Reader` 和 `io.Writer` 接口,`io.Reader` 读取压缩文件流,输出解压缩数据,`io.Writer` 实现输入原始数据,并写入到输入的 `io.Writer`。 想实现压缩数据流,看来需要使用 `io.Writer` 接口了,但是这个输入和输出都是 `io.Writer`,这可为难了,因为我们读取文件获得是 `io.Reader`,http 接口输入也是 `io.Reader`,貌似没有可以直接用的接口,没有办法实现了吗,不会我们自已封装一下,下面是封装的 `lzo` 数据流压缩方法: ```go package lzo import ( "bytes" "io" "github.com/cyberdelia/lzo" ) type Reader struct { r io.Reader rb []byte buff *bytes.Buffer lzo *lzo.Writer err error } func NewReader(r io.Reader) *Reader { z := &Reader{ r: r, rb: make([]byte, 256*1024), buff: bytes.NewBuffer(make([]byte, 0, 256*1024)), } z.lzo, _ = lzo.NewWriterLevel(z.buff, lzo.BestSpeed) return z } func (z *Reader) compress() { if z.err != nil { return } var nr, nw int nr, z.err = z.r.Read(z.rb) if z.err == io.EOF { if err := z.lzo.Close(); err != nil { z.err = err } } if nr > 0 { nw, z.err = z.lzo.Write(z.rb[:nr]) if z.err == nil && nr != nw { z.err = io.ErrShortWrite } } } func (z *Reader) Read(p []byte) (n int, err error) { if z.err != nil { return 0, z.err } if z.buff.Len() <= 0 { z.compress() } n, err = z.buff.Read(p) if err == io.EOF { err = nil } else if err != nil { z.err = err } return } func (z *Reader) Reset(r io.Reader) { z.r = r z.buff.Reset() z.err = nil z.lzo, _ = lzo.NewWriterLevel(z.buff, lzo.BestSpeed) } ``` 这个库会固定消耗 512k 内存,并不是很大,我们需要创建一个读取 buf 和一个压缩缓冲 buf, 都是256k的大小,实际压缩缓冲的 buf 并不需要 256k,毕竟压缩后数据会比原始数据小,考虑空间并不是很大,直接分配 256k 避免运行时分配。 实现原理当 http 从输入的 `io.Reader` (实际就是我们上面封装的 `lzo` 库), 读取数据时,这个库检查压缩缓冲是否为空,为空的情况会从文件读取 256k 数据并压缩输入到压缩缓冲中,然后从压缩缓冲读取数据给 http 的 `io.Reader`,如果压缩缓冲区有数据就直接从压缩缓冲区读取压缩数据。 这并不是线程安全的,并且固定分配 512k 的缓冲,所以也提供了一个 `Reset` 方法,来复用这个对象,避免重复分配内存,但是需要保证一个` lzo` 对象实例只能被一个 Goroutine 访问, 这可以使用 `sync.Pool` 来保证,下面的代码我用另一种方法来保证。 ```go package main import ( "fmt" "os" "path/filepath" "sync" "time" ".../pkg/aliyun_oss" ".../pkg/lzo" ) func main() { var oss *aliyun_oss.AliyunOSS files := os.Args[1:] if len(files) < 1 { fmt.Println("请输入要上传的文件") os.Exit(1) } fmt.Printf("待备份文件数量:%d\n", len(files)) startTime := time.Now() defer func() { fmt.Printf("共耗时:%s\n", time.Now().Sub(startTime).String()) }() var wg sync.WaitGroup n := 4 c := make(chan string) // 压缩日志 wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() var compress *lzo.Reader for file := range c { r, err := os.Open(file) if err != nil { panic(err) } if compress == nil { compress = lzo.NewReader(r) } else { compress.Reset(r) } name := filepath.Base(file) err = oss.PutObject("tmp/"+name+"1.lzo", compress) r.Close() if err != nil { panic(err) } } }() } for _, file := range files { c <- file } close(c) wg.Wait() } ``` 程序为每个 Goroutine 分配一个固定的 `compress` ,当需要压缩文件的时候判断是创建还是重置,来达到复用的效果。 该程序运行输出: ```shell 待备份文件数量:336 共耗时 18m20.162441931s ``` 实际耗时比优化前提升了 28%, 实际通过 `iostat` 命令分析也发现,资源消耗也有了明显的改善,下面是 `iostat -m -x 5 10000` 命令采集各个阶段数据。 ```shell avg-cpu: %user %nice %system %iowait %steal %idle 15.72 0.00 6.58 74.10 0.00 3.60 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 vdb 3.80 3.40 1374.20 1.20 86484.00 18.40 125.79 121.57 87.24 87.32 1.00 0.73 100.00 avg-cpu: %user %nice %system %iowait %steal %idle 26.69 0.00 8.42 64.27 0.00 0.62 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.20 426.80 0.80 9084.80 4.00 42.51 2.69 6.29 6.30 1.00 0.63 26.92 vdb 1.80 0.00 1092.60 0.00 72306.40 0.00 132.36 122.06 108.45 108.45 0.00 0.92 100.02 ``` 通过 `iostat` 发现只有 `r_await`, `w_await` 被完全优化,`iowait` 有明显的改善,运行时间更短了,效率更高了,对 io 产生影响的时间也更短了。 ## 优化期间遇到的问题 首先对找到的 `lzo` 算法库进行测试,确保压缩和解压缩没有问题,并且和 `lzop` 命令兼容。 在这期间发现使用压缩的数据比 `lzop` 压缩数据大了很多,之后阅读了源码实现,并没有发现任何问题,尝试调整缓冲区大小,发现对生成的压缩文件大小有明显改善。 这个发现让我也很为难,究竟多大的缓冲区合适呢,只能去看 `lzop` 的实现了,发现 lzop 默认压缩块大小为 256k, 实际 lzo 算法支持的最大块大小就是 256k,所以实现 `lzo` 算法包装是创建的是 256k 的缓冲区的,这个缓冲区的大小就是压缩块的大小,大家使用的时候建议不要调整了。 ## 总结 这个方案上线之后,由原来需要近半分钟上传的,改善到大约只有十秒(Go 语言本身效率也有很大帮助),而且 load 有了明显的改善。 优化前每当运行日志备份,CPU 经常爆表,优化后备份时 CPU 增幅 20%,可以从容应对业务扩展问题了。 测试是在一台空闲的机器上进行的,实际生产服务器本身 `w_await` 会有 20 左右,如果使用固态硬盘,全双工模式,读和写是分离的,那么优化掉 `w_await` 对业务的帮助是非常大的,不会阻塞业务日志写通道了。 当然我们服务器是高速云盘(机械盘),由于机械盘物理特征只能是半双工,要么读、要么写,所以优化掉 `w_await` 确实效率会提升很多,但是依然会对业务服务写有影响。 **转载:** **本文作者: 戚银([thinkeridea](https://blog.thinkeridea.com/))** **本文链接: [https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html](https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html)** **版权声明: 本博客所有文章除特别声明外,均采用 [CC BY 4.0 CN协议](http://creativecommons.org/licenses/by/4.0/deed.zh) 许可协议。转载请注明出处!**

0 个评论

要回复文章请先登录注册