带缓冲的写并且读取固定大小byte数组的对象设计

目前有这样一个需求,需要多个goroutine来写。多个goroutine来读。但是写的时候必须组装成固定大小的byte数组后才能被读取,否则就等待下一次写,知道足够大。

直到关闭对象后,将剩余的byte数组都放入队列。

目前设计如下,感觉存在一些问题,请坛子里的牛人给写建议,如何设计更优雅呢?

package blob

import (
    "sync"
    "errors"
    "sync/atomic"
)

var ErrClosedBuffer = errors.New("io: read/write on closed buffer")
var ErrBufferNotFull = errors.New("bufio: buffer not full")

type BufferWriterReader struct {
    wMu  sync.Mutex // Serializes Write operations
    buf  []byte
    n, c int //n 每个byte数组的大小,c 一共剩余的队列长度
    wCh  chan []byte

    size int
    done uint32
    once sync.Once
}

//从缓冲区中读取一个数组,该数组长度可能会小于设定的缓冲长度
func (p *BufferWriterReader) ReadBytes() (buf []byte, err error) {
    //如果缓冲已关闭,队列已读完
    if atomic.LoadUint32(&p.done) == 1 && p.c == 0 {
        return nil, ErrClosedBuffer
    }
    if p.c > 0 {
        err = nil
        buf = <-p.wCh
        p.c = p.c - 1
    } else {
        return nil, ErrBufferNotFull
    }
    return
}

func (p *BufferWriterReader) Write(b []byte) (n int, err error) {

    if atomic.LoadUint32(&p.done) == 1 {
        return 0, ErrClosedBuffer
    }
    p.wMu.Lock()
    defer p.wMu.Unlock()
    //需要将p.buf填满到指定长度才能放到channel中等待读取
    for once := true; once || len(b) > 0; once = false {
        //缓冲区剩余字节
        l := p.size - p.n
        i := 0
        if l >= len(b) {
            i = copy(p.buf[p.n:], b)
            b = b[:0]
        } else {
            i = copy(p.buf[p.n:], b)
            b = b[l+1:]
        }

        p.n += i

        if p.n == p.size {
            p.wCh <- p.buf
            p.n = 0
            p.buf = make([]byte, p.size)
            p.c ++
        }

        if len(b) > 0 {
            once = true
        }
    }
    return n, nil
}

func (b *BufferWriterReader) Close() error {

    b.once.Do(func() {
        //调用关闭方法后,将p.buf放入到channel中
        atomic.AddUint32(&b.done, 1)
        if  b.n > 0 {
            b.wCh <- b.buf
            b.n = 0
            b.buf = make([]byte, b.size)
            b.c ++
        }
        close(b.wCh)
    })

    return nil

}

func NewBufferWriterReaderSize(rSize int, wSize int) *BufferWriterReader {
    return &BufferWriterReader{
        wMu:  sync.Mutex{},
        buf:  make([]byte, rSize),
        wCh:  make(chan []byte, wSize),
        done: 0,
        n:    0,
        c:    0,
        size: rSize,
    }
}
已邀请:

lichao2018

赞同来自: buscoop

type pipeWriter struct {
    w   *io.PipeWriter
    buf *bufio.Writer
    mu  sync.Mutex
}

func (w *pipeWriter) Write(p []byte) (int, error) {
    w.mu.Lock()
    defer w.mu.Unlock()

    return w.buf.Write(p)
}

func (w *pipeWriter) Close() error {
    w.mu.Lock()
    defer w.mu.Unlock()

    w.buf.Flush()
    return w.w.Close()
}

// Pipe ...
func Pipe(size int) (io.WriteCloser, io.ReadCloser) {
    r, w := io.Pipe()
    return &pipeWriter{w: w, buf: bufio.NewWriterSize(w, size)}, r
}

viewer

赞同来自: buscoop

收藏先

icexin

赞同来自:

用一个bufio.Writer加一个channel就可以实现,close逻辑用你的就行。

package main

import (
    "bufio"
    "sync"
)

type channelWriter struct {
    ch chan []byte
}

func (c *channelWriter) Write(b []byte) (int, error) {
    c.ch <- b
    return len(b), nil
}

func (c *channelWriter) Read() []byte {
    return <-c.ch
}

type BufferWriterReader struct {
    mutex sync.Mutex
    chanw *channelWriter
    bufw  *bufio.Writer
}

func NewBufferWriterReader(size int) *BufferWriterReader {
    chanw := &channelWriter{
        ch: make(chan []byte, 10),
    }
    bufw := bufio.NewWriterSize(chanw, size)
    return &BufferWriterReader{
        chanw: chanw,
        bufw:  bufw,
    }
}

func (b *BufferWriterReader) Read() ([]byte, error) {
    return b.chanw.Read(), nil
}

func (b *BufferWriterReader) Write(p []byte) (int, error) {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    return b.bufw.Write(p)
}

func (b *BufferWriterReader) Close() error {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    b.bufw.Flush()
    return nil
}

mysll

赞同来自:

func (c *channelWriter) Write(b []byte) (int, error) {
    c.ch <- b
    return len(b), nil
}

这一块代码是有问题的,如果写入一个大于最大长度的数据,还是会全部写入。应该只写入最大能接受的值,c.ch <- b[:maxsize] ,然后return max_size,nil这样才可以。 另外还有一个问题,这里应该拷贝一下[]byte,不然数据会被履盖。因为这里可能是内部的缓冲buffer

要回复问题请先登录注册