原创分享 22 Go 常见的并发模式和并发模型

happy_brother · 2021年01月21日 · 120 次阅读

Go 并发模型

传统的编程语言 C++ Java Python 等,他们的并发逻辑多事基于操作系统的线程。并发执行单元(线程)之间的通信利用的就是操作系统提供的线程或进程间通信的原语。如:共享内存、信号、管道、消息队列、套接字等。在这些通信原语中,使用最广泛的就是共享内存。

如果你使用过这种共享内存的并发模型,其实是难用的和容易发生错误的,特别是在大型或复杂的业务场景中。

Go 语言从程序设计当初,就将解决上面传统并发模型问题作为目标,并在新并发模型设计中借鉴注明的 CSP(Communicationing Sequential Processes-通信顺序进程) 并发模型。

CSP 模型目的在于简化并发程序的编写,让并发程序的编写顺序与编写顺序程序一样简单。

生产者 —》输出数据 — 输入/输出原语 —》输出数据

为了实现 CSP 模型,GO 语言引入了 Channel.Goroutine 可以读写 channel 中的数据,通过 channel 将 goroutine 组合连接在一起。

Go 语言中 CSP 虽然是主流并发模型,但是还是支持共享内存并发模型。主要是在 sync 包中的互斥锁、读写锁、条件变量、原子操作等。那么我们该如何选择呢?

第一种:创建模式

通常会使用下面的方式:

type Worker struct {

}

func Do(f func()) chan Worker {
    w:= make(chan Worker)

    go func() {
        f()
        w<-Worker{}
    }()

    return w
}

func main() {
    c:=Do(func() {
        fmt.Print("到下班时间了...")
    })

    <-c
}

Do 函数内部创建了一个 gorutine 并且返回了一个 channel 类型的变量。Do 函数创建的新 goroutine 与调用的 Do 函数的 goroutine 之间通过一个 channel 联系了起来,2 个 goroutine 可以通过 channel 进行通讯。Do 函数的实现因为 channel 在 Go 语言中是一等公民,channel 可以像变量一样初始化、传递和赋值。上面的例子 Do 返回了一个变量,这个变量就是通道,实现了主 goroutine 和子 goroutine 的通信。

第二种: 退出模式

##### a) 分离模式 分离模式使用最广泛的是 goroutine 退出模式。所谓分离模式就是创建它的 goroutine 不需要关心她的退出,这类 goroutine 启动后与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即 goroutine 退出。 场景 1:一次性任务

// $GOROOT/src/net/dial.go

func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
    ... ...
        if oldCancel := d.Cancel; oldCancel != nil {
                subCtx, cancel := context.WithCancel(ctx)
                defer cancel()
                go func() {
                        select {
                        case <-oldCancel:
                                cancel()
                        case <-subCtx.Done():
                        }
                }()
                ctx = subCtx
        }
    ... ...
}

在 DialContext 方法中创建了一个 goroutine,用来监听量个 channel 是否有数据,一旦有数据,处理后即退出。

场景 2 常驻后台执行一些特定任务,比如常用 for{…}或 for{select{…}}形式,还可以用定时器或事件驱动执行。下面是 Go 给每个 P 内置的 GC goroutine 就是这种场景的。

// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
        // Background marking is performed by per-P G's. Ensure that
        // each P has a background GC G.
        for _, p := range allp {
                if p.gcBgMarkWorker == 0 {
                        go gcBgMarkWorker(p) // 每个P创建一个goroutine,以运行gcBgMarkWorker
                        notetsleepg(&work.bgMarkReady, -1)
                        noteclear(&work.bgMarkReady)
                }
        }
}

func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ... ...
    for { 
            // 处理GC事宜
        ... ...
    }
}

##### b) join 模式 在线程模型中,父线程可以通过 pthread join 来等待子线程结束并获取子线程的结束状态。在 Go 中,我们有时候也有这种需求:goroutine 的创建者需要等待新 goroutine 的结果。

 type Worker struct {

}

func Do(f func()) chan Worker {
    w:= make(chan Worker)

    go func() {
        f()
        w<-Worker{}
    }()

    return w
}

func main() {
    c:=Do(func() {
        fmt.Print("到下班时间了...")
    })

    <-c
}

我们还是看刚刚上面的这个例子,Do 函数使用典型的 goroutine 的创建模式创建了一个 groutine,main 的 goroutine 作为创建通过 Do 函数返回的 channel 与新 goroutine 建立关系,这个 channel 得用途就是在文革 goroutine 之间建立退出时间的 “信号” 通信机制。main goroutine 在创建完新 goroutine 后就在该 channel 上阻塞等待了,知道新的 goroutine 退出前向该 channel 发送了一个” 信号”。

运行代码,结果如下:

到下班时间了... Process finished with exit code 0

获取 goroutine 的退出状态

如果新 goroutine 的创建者不仅仅要等待 goroutine 的退出,还要知道结束状态,我们可以通过自定义类型的 channel 来实现这样的需求。

func add(a,b int) int{
    return a+b
}

func Do(f func(a,b int) int,a,b int) chan int{
    c:=make(chan int)

    go func() {
        r:=f(a,b)
        c<-r
    }()

    return c
}

func main() {
    c:=Do(add,1,5)
    fmt.Println(<-c)
}

运行结果是 6

等待多个 goroutine 退出

func add(a,b int) int{
    return a+b
}

func Do(f func(a,b int) int,a,b,n int) chan int{
    c:=make(chan int)

    var wg sync.WaitGroup
    for i:=0;i<n;i++{
        wg.Add(1)
        go func() {
            r:=f(a,b)
            fmt.Println(r)
            wg.Done()
        }()
    }

    go func() {
        wg.Wait()
        c<-100
    }()

    go func() {

    }()

    return c
}

func main() {
    c:=Do(add,1,5,5)
    fmt.Println(<-c)
}

运行结果
6
6
6
6
6
100

##### c) notify-wait 模式 前面的场景中,goroutine 的创建者都是在被动地等待新 goroutine 的退出。有些场景,goroutine 的创建者需要主动通知那些新 goroutine 退出。

通知并等待一个 goroutine 的退出

func add(a, b int) int {
    return a + b
}

func Do(f func(a, b int) int, a, b int) chan int {
    quit := make(chan int)

    go func() {
        var job chan string
        for {

            select {
            case x := <-job:
                f(a, b)
                fmt.Println(x)
            case y := <-quit:
                quit <- y
            }
        }
    }()

    return quit
}

func main() {
    c := Do(add, 1, 5)
    fmt.Println("开始干活")
    time.Sleep(1 * time.Second)
    c <- 0
    timer := time.NewTimer(time.Second * 10)
    defer timer.Stop()
    select {
    case status := <-c:
        fmt.Println(status)
    case <-timer.C:
        fmt.Println("等待...")
    }
}

执行代码结果如下 开始干活 0

通知并等待多个 goroutine 退出

下面是通知并等待多个 goroutine 退出的场景。Go 语言的 channel 有一个特性,那就是当使用 close 函数关闭 channel 时,所有阻塞到该 channel 上的 goroutine 都会得到通知。

func worker(x int)  {
    time.Sleep(time.Second * time.Duration(x))
}

func Do(f func(a int), n int) chan int {
    quit := make(chan int)
    job:=make(chan int)
    var wg sync.WaitGroup
    for i:=0;i<n;i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            name := fmt.Sprintf("worker-%d",i)
            for {
                j,ok:=<-job
                if !ok{
                    fmt.Println(name,"done")
                    return
                }
                worker(j)
            }
        }(i)
    }

    go func() {
        <-quit
        close(job)
        wg.Wait()
        quit<-200
    }()


    return quit
}

func main() {
    quit:=Do(worker,5)
    fmt.Println("func Work...")
    quit<-1
    timer := time.NewTimer(time.Second * 10)
    defer timer.Stop()
    select {
    case status := <-quit:
        fmt.Println(status)
    case <-timer.C:
        fmt.Println("等待...")
    }
}

运行结果
func Work...
worker-1 done
worker-2 done
worker-3 done
worker-4 done
worker-0 done
200

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册