新手问题 channel连接池的问题

songtianyi · 2017年02月16日 · 最后由 Rg 回复于 2017年02月17日 · 402 次阅读
package pool

import (
    "errors"
    "fmt"
    "net"
    "sync"
)

// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
    // storage for our net.Conn connections
    mu    sync.Mutex
    conns chan net.Conn

    // net.Conn generator
    factory Factory
}

// Factory is a function to create new connections.
type Factory func() (net.Conn, error)

// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
    if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
        return nil, errors.New("invalid capacity settings")
    }

    c := &channelPool{
        conns:   make(chan net.Conn, maxCap),
        factory: factory,
    }

    // create initial connections, if something goes wrong,
    // just close the pool error out.
    for i := 0; i < initialCap; i++ {
        conn, err := factory()
        if err != nil {
            c.Close()
            return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
        }
        c.conns <- conn
    }

    return c, nil
}

func (c *channelPool) getConns() chan net.Conn {
    c.mu.Lock()
    conns := c.conns
    c.mu.Unlock()
    return conns
}

// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (net.Conn, error) {
    conns := c.getConns()
    if conns == nil {
        return nil, ErrClosed
    }

    // wrap our connections with out custom net.Conn implementation (wrapConn
    // method) that puts the connection back to the pool if it's closed.
    select {
    case conn := <-conns:
        if conn == nil {
            return nil, ErrClosed
        }

        return c.wrapConn(conn), nil
    default:
        conn, err := c.factory()
        if err != nil {
            return nil, err
        }

        return c.wrapConn(conn), nil
    }
}

// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn net.Conn) error {
    if conn == nil {
        return errors.New("connection is nil. rejecting")
    }

    c.mu.Lock()
    defer c.mu.Unlock()

    if c.conns == nil {
        // pool is closed, close passed connection
        return conn.Close()
    }

    // put the resource back into the pool. If the pool is full, this will
    // block and the default case will be executed.
    select {
    case c.conns <- conn:
        return nil
    default:
        // pool is full, close passed connection
        return conn.Close()
    }
}

func (c *channelPool) Close() {
    c.mu.Lock()
    conns := c.conns
    c.conns = nil
    c.factory = nil
    c.mu.Unlock()

    if conns == nil {
        return
    }

    close(conns)
    for conn := range conns {
        conn.Close()
    }
}

func (c *channelPool) Len() int { return len(c.getConns()) }

channel 是线程安全的,为什么要用加锁的方式去实现? channel.go

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio

没必要加锁的吧,你可以试试,去掉锁,用这个包写一下并发高的服务,编译加 -race 看看会不会报数据竞争

已经关闭的 c.conns 发送 conn 的现象如果不加锁,可能会出现向已经关闭的c.conns 发送conn的现象;

假设此时不加锁,一个Gorutine A运行到put函数的select这里,另外一个 Gorutine B 执行了Close函数,关闭了c.conns chan, runtime切换至Gorutine B继续执行, 在

select {
    case c.conns <- conn:
        return nil
    default:
        // pool is full, close passed connection
        return conn.Close()
    }

里面会引起panic

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册