使用gRPC
的一个问题是,它的默认最大消息大小默认被设置为4MB
,那么当数据量太大时该怎么办?
可以通过在创建Server
的时候,配置相关的参数来扩大限制最小消息大小的值。
s := grpc.NewServer(grpc.MaxRecvMsgSize(size), grpc.MaxSendMsgSize(size))
MaxRecvMsgSize
和MaxSendMsgSize
分别设置服务器可以接收的最大消息大小和可以发送的最大消息大小(以字节为单位)。不设置的话默认都是4MB
。
虽然可以配置,但这种行为是一种滑坡谬误,可能会导致不断修改增加服务端客户端最大消息大小,而且每次请求不一定都需要全部的数据,会导致很多性能和资源上的浪费。
自然地将数据分成更小的块并使用gRPC
流方法(stream
)对其进行流式传输是一个不错的选择。
首先一个proto
,是一个返回流式消息类型的rpc service
。
syntax = "proto3";
package pb;
service Chunker {
rpc Chunker(Empty) returns (stream Chunk) {}
}
message Empty{}
message Chunk {
bytes chunk = 1;
}
实现server
的Chunker
逻辑。流式消息的大小设置为64KB
。
const chunkSize = 64 * 1024
type chunkerSrv []byte
func (c chunkerSrv) Chunker(_ *pb.Empty, srv pb.Chunker_ChunkerServer) error {
chunk := &pb.Chunk{}
n := len(c)
for cur := 0; cur < n; cur += chunkSize {
if cur+chunkSize > n {
chunk.Chunk = c[cur:n]
} else {
chunk.Chunk = c[cur : cur+chunkSize]
}
if err := srv.Send(chunk); err != nil {
return err
}
}
return nil
}
然后把gRPC
服务端运行起来,使用随机填充128M
的数据来方便测试。
func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
blob := make([]byte, 128*1024*1024) // 128M
rand.Read(blob)
pb.RegisterChunkerServer(s, chunkerSrv(blob))
log.Println("serving on localhost:8888")
log.Fatal(s.Serve(listen))
}
编写个客户端请求一下。
func main() {
conn, err := grpc.Dial("localhost:8888", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
client := pb.NewChunkerClient(conn)
stream, err := client.Chunker(context.Background(), &pb.Empty{})
if err != nil {
log.Fatal(err)
}
var blob []byte
for {
c, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Printf("Transfer of %d bytes successful", len(blob))
// Transfer of 134217728 bytes successful
return
}
log.Fatal(err)
}
blob = append(blob, c.Chunk...)
}
使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样客户端自己再拼接成完整的数据,无论多少数据都可以不用修改配置。
在数据量大的情况,不是每次都需要请求全量的数据。基于之上可以借鉴http
的range
协议来分片的取获取资源。同样的在Chunker
的proto
基础上修改,在请求的时候能传入零个(代表全部获取)或多个Range
来分片获取资源。
syntax = "proto3";
package pb;
service RangeChunker {
rpc Range(Res) returns (stream Chunk) {}
}
message Res {
repeated Range r = 1;
}
message Range {
int32 start = 1;
int32 stop = 2;
}
message Chunk {
bytes chunk = 1;
}
服务端的实现主要是Range
的解析,这里实现和http
的range
类似,使用0-99
代表前 100 字节而不是0-100
,并简化了很多,比如只保留了stop
设置-1
时代表最后一个字节,其他的负数操作都没有实现。需要的话可以自行修改rangeLimit
。
const chunkSize = 64 * 1024
type chunkerSrv []byte
func (c chunkerSrv) Range(r *pb.Res, srv pb.RangeChunker_RangeServer) error {
chunk := &pb.Chunk{}
ranges := c.parseRanges(r)
for _, rr := range ranges {
start, stop := rr[0], rr[1]
for cur := start; cur < stop; cur += chunkSize {
if cur+chunkSize > stop {
chunk.Chunk = c[cur:stop]
} else {
chunk.Chunk = c[cur : cur+chunkSize]
}
if err := srv.Send(chunk); err != nil {
return err
}
}
}
return nil
}
func (c chunkerSrv) parseRanges(r *pb.Res) [][2]int {
n := len(c)
ranges := [][2]int{}
rs := r.GetR()
if len(rs) == 0 {
return [][2]int{[2]int{0, n}}
}
for _, rr := range rs {
start, stop := rangeLimit(rr, n)
if start == -1 {
return nil
}
ranges = append(ranges, [2]int{start, stop})
}
return ranges
}
func rangeLimit(r *pb.Range, llen int) (int, int) {
start, stop := int(r.Start), int(r.Stop)+1
if stop > llen || stop == 0 {
stop = llen
}
if start < 0 || stop < 0 || start >= stop {
return -1, -1
}
return start, stop
}
客户端请求也很简单。
stream, err := client.Range(context.Background(), &pb.Res{
R: []*pb.Range{
{0, 99},
{100, 199},
{200, -1},
},
})
这样我们就可以只请求资源的某个部分,基于此之上还可以并行请求,断点续传等。
> 原文链接:https://blog.keyboardman.me/2018/08/06/large-messages-with-grpc/