带缓冲的写并且读取固定大小byte数组的对象设计

目前有这样一个需求,需要多个goroutine来写。多个goroutine来读。但是写的时候必须组装成固定大小的byte数组后才能被读取,否则就等待下一次写,知道足够大。 直到关闭对象后,将剩余的byte数组都放入队列。 目前设计如下,感觉存在一些问题,请坛子里的牛人给写建议,如何设计更优雅呢? ``` package blob import ( "sync" "errors" "sync/atomic" ) var ErrClosedBuffer = errors.New("io: read/write on closed buffer") var ErrBufferNotFull = errors.New("bufio: buffer not full") type BufferWriterReader struct { wMu sync.Mutex // Serializes Write operations buf []byte n, c int //n 每个byte数组的大小,c 一共剩余的队列长度 wCh chan []byte size int done uint32 once sync.Once } //从缓冲区中读取一个数组,该数组长度可能会小于设定的缓冲长度 func (p *BufferWriterReader) ReadBytes() (buf []byte, err error) { //如果缓冲已关闭,队列已读完 if atomic.LoadUint32(&p.done) == 1 && p.c == 0 { return nil, ErrClosedBuffer } if p.c > 0 { err = nil buf = <-p.wCh p.c = p.c - 1 } else { return nil, ErrBufferNotFull } return } func (p *BufferWriterReader) Write(b []byte) (n int, err error) { if atomic.LoadUint32(&p.done) == 1 { return 0, ErrClosedBuffer } p.wMu.Lock() defer p.wMu.Unlock() //需要将p.buf填满到指定长度才能放到channel中等待读取 for once := true; once || len(b) > 0; once = false { //缓冲区剩余字节 l := p.size - p.n i := 0 if l >= len(b) { i = copy(p.buf[p.n:], b) b = b[:0] } else { i = copy(p.buf[p.n:], b) b = b[l+1:] } p.n += i if p.n == p.size { p.wCh <- p.buf p.n = 0 p.buf = make([]byte, p.size) p.c ++ } if len(b) > 0 { once = true } } return n, nil } func (b *BufferWriterReader) Close() error { b.once.Do(func() { //调用关闭方法后,将p.buf放入到channel中 atomic.AddUint32(&b.done, 1) if b.n > 0 { b.wCh <- b.buf b.n = 0 b.buf = make([]byte, b.size) b.c ++ } close(b.wCh) }) return nil } func NewBufferWriterReaderSize(rSize int, wSize int) *BufferWriterReader { return &BufferWriterReader{ wMu: sync.Mutex{}, buf: make([]byte, rSize), wCh: make(chan []byte, wSize), done: 0, n: 0, c: 0, size: rSize, } } ```
已邀请:

lichao2018

赞同来自: buscoop

```
type pipeWriter struct {
w *io.PipeWriter
buf *bufio.Writer
mu sync.Mutex
}

func (w *pipeWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()

return w.buf.Write(p)
}

func (w *pipeWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()

w.buf.Flush()
return w.w.Close()
}

// Pipe ...
func Pipe(size int) (io.WriteCloser, io.ReadCloser) {
r, w := io.Pipe()
return &pipeWriter{w: w, buf: bufio.NewWriterSize(w, size)}, r
}
```

viewer

赞同来自: buscoop

收藏先
用一个bufio.Writer加一个channel就可以实现,close逻辑用你的就行。

``` go
package main

import (
"bufio"
"sync"
)

type channelWriter struct {
ch chan []byte
}

func (c *channelWriter) Write(b []byte) (int, error) {
c.ch <- b
return len(b), nil
}

func (c *channelWriter) Read() []byte {
return <-c.ch
}

type BufferWriterReader struct {
mutex sync.Mutex
chanw *channelWriter
bufw *bufio.Writer
}

func NewBufferWriterReader(size int) *BufferWriterReader {
chanw := &channelWriter{
ch: make(chan []byte, 10),
}
bufw := bufio.NewWriterSize(chanw, size)
return &BufferWriterReader{
chanw: chanw,
bufw: bufw,
}
}

func (b *BufferWriterReader) Read() ([]byte, error) {
return b.chanw.Read(), nil
}

func (b *BufferWriterReader) Write(p []byte) (int, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
return b.bufw.Write(p)
}

func (b *BufferWriterReader) Close() error {
b.mutex.Lock()
defer b.mutex.Unlock()
b.bufw.Flush()
return nil
}
```
```go
func (c *channelWriter) Write(b []byte) (int, error) {
c.ch <- b
return len(b), nil
}
```
这一块代码是有问题的,如果写入一个大于最大长度的数据,还是会全部写入。应该只写入最大能接受的值,c.ch <- b[:maxsize] ,然后return max_size,nil这样才可以。
另外还有一个问题,这里应该拷贝一下[]byte,不然数据会被履盖。因为这里可能是内部的缓冲buffer

要回复问题请先登录注册