新手问题 理解Golang并发编程

dasheng · 2017年07月29日 · 224 次阅读

点击查看原文章

concurrency vs parallelism

并发和并行是彼此相关的两个概念,并不能完全等价。在程序中,并发强调的是独立执行的程序的组合;并行强调的是同时执行计算任务 [1]。
计算机核心的数量决定了并行计算的能力,大多数人类作为 “单核” 动物 (老顽童小龙女除外),可以说自己在并发某些任务,如我在听歌写代码,但是不能说这两件事在并行,参考下图: concurrency vs parallelism Golang 的并发模型源于 Communicating Sequential Processes (CSP),通过提供 goroutine 和 channel 来实现并发编程模式。

Goroutine

Goroutine 由 Go 运行时创建和管理,是用于调度 CPU 资源的 “最小单元”,和 OS 的线程相比更轻量 [2]:

  • 内存消耗更低只需 2kB 初始栈空间,而线程初始要 1Mb 的空间;
  • 由 golang 的运行时环境创建和销毁,更加廉价,不支持手动管理;
  • 切换效率更高等。 Goroutine 和线程的关系如下图所示: goroutine vs thread

我们可以轻松地创建成百上千的 goroutine,而不会降低程序的执行效率。
通过 goroutine 可以让一个函数和其他的函数并行执行。可以在函数调用前面加上go关键字,方便地创建一个 goroutine。
main 函数本身也是一个 goroutine[3]。
举例如下:

package main

import "fmt"

func main() {
    fmt.Println("begin main goroutine")
    go hello()
    fmt.Println("end main goroutine")
}

func hello() {
    fmt.Println("begin hello goroutine")
}

输出:

begin main goroutine
end main goroutine

上面的例子中,并不会输出begin hello goroutine,这是因为,通过使用 goroutine,我们不需要等待函数调用的返回结果,而会接着执行下面的代码。
可以在go hello()后面添加:

time.Sleep(1 * time.Second)

就可以正常输出begin hello goroutine

channel

Go 提供了一种机制能够使 goroutine 之间进行通信和同步,它就是 channel。
channel 是一种类型,关键字chan和 channel 传输内容的类型共同定义了某一 channel。
定义方式为:var c chan string = make(chan string),也可以简写为:var c = make(chan string)c := make(chan string)

通过左箭头<-操作符操作 channel 变量:

  • c <- "ping"向 channel 发送一个值为 “ping” 的字符串,
  • msg := <- c接收 channel 中的一个值,并赋給 msg。
package main

import (
    "fmt"
    "strconv"
    "time"
)

func main() {
    c := make(chan string)
    go ping(c)
    go print(c)
    var input string
    fmt.Scanln(&input)
}

func ping(c chan string) {
    for i := 0; ; i++ {
        c <- strconv.Itoa(i)
    }
}

func print(c chan string) {
    for {
        <-c
        fmt.Println("reveving: " + <-c)
        time.Sleep(1 * time.Second)
    }
}

输出:

reveving: 1
reveving: 3
reveving: 5
reveving: 7
reveving: 9
    ...

按功能,可以将 channel 分为只发送或只接收 channel,通过修改函数签名的 channel 形参类型来指定 channel 的 “方向”:

  • 只允许发送: func ping(c chan<- string)
  • 只允许接收: func print(c <-chan string)
  • 任何对只发送 channel 的接收操作和只接收 channel 的发送操作都会产生编译错误。
  • 不指定方向的 channel 被称作 “双向” channel,可以将 “双向” channel 最为参数,传递给接收单向 channel 的函数,反之,则不行。

unbuffered channel

非缓冲 channel,也就是缓冲池大小为 0 的 channel 或者同步 channel,上面的例子都是非缓冲 channel,定义方式为:

  • ch := make(chan int)
  • ch := make(chan int, 0)

非缓冲 channel 在同步读时,如果 channel 的 sendq 中有就绪的 goroutine,那么就取出(copy)数据并释放发送方 goroutine;如果没有就绪的 goroutine,那么将接收方 goroutine 挂起。
非缓冲 channel 在同步写时,如果 channel 的 recvq 中有就绪的 goroutine,那么就取出(copy)数据到接收方 goroutine,并使其就绪;如果没有,那么将发送发 goroutine 挂起。

buffered channel

缓冲 channel 只能容纳固定量的数据,当缓冲池满之后,发送发被阻塞,直到数据被接收释放缓冲池,定义如下:

  • ch := make(chan int) 缓冲 channel 可以用来限制吞吐量,例子如下:
package main

import (
    "fmt"
    "time"
)

// Request struct
type Request struct {
}

var sem = make(chan int, 5)     // Create a buffered channel witch capacity of 5

func main() {
    queue := make(chan *Request)
    go start(queue)
    go serve(queue)
    var input string
    fmt.Scanln(&input)
}

func start(queue chan *Request) {
    for {
        queue <- &Request{}
    }
}

func serve(queue chan *Request) {
    for req := range queue {
        sem <- 1       // Put on signal to channel
        go handle(req) // Don't wait for handle to finish.
    }
}

func handle(r *Request) {
    process(r) // May take a long time.
    <-sem      // Done; enable next request to run.
}

func process(r *Request) {
    fmt.Println("process")
    time.Sleep(4 * time.Second)
}

每隔 4 秒钟,输出:

process
process
process
process
process

select

针对于 channel,Golang 提供了一个类似switch的功能,即select,使用如下:

  1. select选择第一个就绪的 channel 进行处理
  2. 如果有多个就绪的 channel,则随机选择一个 channel 进行处理
  3. 如果没有就绪的 channel,则等待直到某一 channel 就绪
  4. 如果有default,则在 3 情形中不会等待,而是立即执行 default 中的代码
package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go ping(ch1)
    go pong(ch2)
    go print(ch1, ch2)
    var input string
    fmt.Scanln(&input)
}

func ping(ch chan int) {
    time.Sleep(2 * time.Second)
    ch<-1
}

func pong(ch chan int) {
    time.Sleep(3 * time.Second)
    ch<-2
}

func print(ch1, ch2 chan int) {
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    case msg := <-ch2:
        fmt.Println(msg)
    }
}

两秒钟之后,输出:1
在 select 语句中添加下面代码:

default:
    fmt.Println("nothing received.")

输出: nothing received.

总结

Golang 将线程抽象出来成为轻量级的 goroutine,开发者不再需要过多地关注 OS 层面的逻辑,终于能够从并发编程中解放出来。
channel 作为 goroutine 通信的媒介,安全高效的实现了 goroutine 之间的通信和共享内存。
用 Effetive go 中的一句话来总结 [4]: > Do not communicate by sharing memory; instead, share memory by communicating.

Reference

[1] https://blog.golang.org/concurrency-is-not-parallelism
[2] http://blog.nindalf.com/how-goroutines-work/
[3] https://www.golang-book.com/books/intro/10
[4] https://golang.org/doc/effective_go.html

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册