技术实践——教你用100行写一个 go 的协程池 (任务池)!!!

点击这里,查看封装 GetCap() 方法等重要内容

实现
Talk is cheap. Show me the code.

任务的定义
任务要包含需要执行的函数、以及函数要传的参数, 因为参数类型、个数不确定, 这里使用可变参数和空接口的形式

type Task struct {
    Handler func(v ...interface{})
    Params  []interface{}
}

任务池的定义
任务池的定义包括了池的容量 capacity、当前运行的 worker(goroutine)数量 runningWorkers、任务队列(channel)taskC、关闭任务池的 channel closeC 以及任务池的状态 state(运行中或已关闭, 用于安全关闭任务池)

type Pool struct {
    capacity       uint64
    runningWorkers uint64
    state          int64
    taskC          chan *Task
    closeC         chan bool
}

任务池的构造函数:

var ErrInvalidPoolCap = errors.New("invalid pool cap")

const (
    RUNNING = 1
    STOPED = 0
)

func NewPool(capacity uint64) (*Pool, error) {
    if capacity <= 0 {
        return nil, ErrInvalidPoolCap
    }
    return &Pool{
        capacity: capacity,
        state:    RUNNING,
        // 初始化任务队列, 队列长度为容量
        taskC:    make(chan *Task, capacity),
        closeC:   make(chan bool),
    }, nil
}

启动 worker

新建 run() 方法作为启动 worker 的方法:

func (p *Pool) run() {
    p.runningWorkers++ // 运行中的任务加一

    go func() {
        defer func() {
            p.runningWorkers-- // worker 结束, 运行中的任务减一
        }()

        for {
            select { // 阻塞等待任务、结束信号到来
            case task, ok := <-p.taskC: // 从 channel 中消费任务
                if !ok { // 如果 channel 被关闭, 结束 worker 运行
                    return
                }
                // 执行任务
                task.Handler(task.Params...)
            case <-p.closeC: // 如果收到关闭信号, 结束 worker 运行
                return
            }
        }
    }()
}

上述代码中, runningWorkers 的加减直接使用了自增运算, 但是考虑到启动多个 worker 时, runningWorkers 就会有数据竞争, 所以我们使用 sync.atomic 包来保证 runningWorkers 的自增操作是原子的。

对 runningWorkers 的操作进行封装:

func (p *Pool) incRunning() { // runningWorkers + 1
    atomic.AddUint64(&p.runningWorkers, 1)
}

func (p *Pool) decRunning() { // runningWorkers - 1
    atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}

func (p *Pool) GetRunningWorkers() uint64 {
    return atomic.LoadUint64(&p.runningWorkers)
}

打铁乘热, 对于 capacity 的操作也考虑数据竞争, 封装 GetCap() 方法:


关键字:网络协议 Java Go

0 个评论

要回复文章请先登录注册