【翻译文】sync.RWMutex - 解决并发读写问题

sync.RWMutex - 解决并发读写问题

当多个线程访问共享数据时,会出现并发读写问题(reader-writer problems)。有两种访问数据的线程类型:

  • 读线程 reader:只进行数据读取
  • 写线程 writer:进行数据修改

当 writer 获取到数据的访问权限后,其他任何线程(reader 或 writer)都无权限访问此数据。这种约束亦存在于现实中,比如,当 writer 在修改数据无法保证原子性时(如数据库),此时读取未完成的修改必须被阻塞,以防止加载脏数据(译者注:数据库中的脏读)。还有许多诸如此类的核心问题,例如:

  • writer 不能无限等待
  • reader 不能无限等待
  • 不允许线程出现无限等待

多读/单写互斥锁(如sync.RWMutex)的具体实现解决了一种并发读写问题。接下来,让我们看下在 Go 语言中是如何实现的,同时它提供了哪些的数据可靠性保证机制。

作为额外的工作,我们将深入研究分析竞态情况下的互斥锁。

用法

在深入研究实现细节之前,我们先看看sync.RWMutex的使用实例。下面的程序使用读写互斥锁来保护临界区--sleep()。为了更好的展示整个过程,临界区部分计算了当前正在执行的 reader 和 writer 的数量(源码)。

package main

import (
    "fmt"
    "math/rand"
    "strings"
    "sync"
    "time"
)

func init() {
    rand.Seed(time.Now().Unix())
}

func sleep() {
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
    sleep()
    m.RLock()
    c <- 1
    sleep()
    c <- -1
    m.RUnlock()
    wg.Done()
}

func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
    sleep()
    m.Lock()
    c <- 1
    sleep()
    c <- -1
    m.Unlock()
    wg.Done()
}

func main() {
    var m sync.RWMutex
    var rs, ws int
    rsCh := make(chan int)
    wsCh := make(chan int)
    go func() {
        for {
            select {
            case n := <-rsCh:
                rs += n
            case n := <-wsCh:
                ws += n
            }
            fmt.Printf("%s%s\n", strings.Repeat("R", rs),
                    strings.Repeat("W", ws))
        }
    }()
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go reader(rsCh, &m, &wg)
    }
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go writer(wsCh, &m, &wg)
    }
    wg.Wait()
}

play.golang.org 加载的程序环境是确定的(比如开始时间),所以rand.Seed(time.Now().Unix())总是返回相同的数值,此时程序的执行结果可能总是相同的。为了避免这种情况,可通过修改不同的随机种子值或者在自己的机器上执行程序。

程序执行结果:

W

R
RR
RRR
RRRR
RRRRR
RRRR
RRR
RRRR
RRR
RR
R

W

R
RR
RRR
RRRR
RRR
RR
R

W

译者注:不同机器上运行的结果会有所不同 每次执行完一组 goroutine(reader 和 writer)的临界区代码后,都会打印新的一行。很显然,RWMutex 允许至少一个 reader(一个或多个 reader)存在而 writer 同时只能存在一个。

同样重要且将进一步讨论的是:writer 调用到Lock()时,将会使新的 reader/writer 被阻塞。当存在 reader 加了 RLock 时,writer 会等待这一组 reader 完成正在执行的任务,当这一组任务完成后,writer 将开始执行。从输出可以很明显的看到,每一行的 R 都会递减一个,直到没有 R 之后将打印一个 W。

...
RRRRR
RRRR
RRR
RR
R

W
...

一旦 writer 结束,之前被阻塞的 reader 将恢复执行,然后下一个 writer 也将开始启动。值得一提的是,如果一个 writer 完成,并且有 reader 和 writer 都在等待,那么首个 reader 将解除阻塞,然后才轮到 writer。这种交替执行的方式使得 writer 需等待当前这组 reader 完成,所以无论 reader 还是 writer 都不会有无限等待的情况。

实现

注意,本文针对的RWMutex实现(Go commit: 718d6c58)在 Go 不同版本中可能随时有修改。 RWMutex为 reader 提供两个方法(RLockRUnlock)、也为 writer 提供了两个方法(LockUnlock

读锁 RLock

为了简洁起见,我们先跳过源码中竞态检测相关部分(它们将被...代替)。

func (rw *RWMutex) RLock() {
    ...
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {    
        runtime_SemacquireMutex(&rw.ReadeSem, false)
    }
    ...
}

readerCount字段是int32类型的值,表示待处理的 reader 数量(正在读取数据或被 writer 阻塞)。这基本上是已调用 RLock 函数,但尚未调用 RUnlock 函数的 reader 数量。

atomic.AddInt32等价于如下原子性表达:

*addr += delta
return *addr

addr*int32类型变量,deltaint32类型。因为此操作具有原子性,所以累加delta操作不会影响其他线程(更多详见Fetch-and-add)。

如果没有 writer,则readerCount总是会大于或等于 0(译者注:因为 writer 会把 readerCount 置为负数,通过 Lock 函数的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders),此时 reader 是一种运行速度很快的非阻塞方式,因为只需要调用atomic.AddInt32

信号量 Semaphore

信号量是 Edsger Dijkstra 发明的数据结构,在解决多种同步问题时很有用。其本质是一个整数,并关联两个操作:

  • 申请acquire(也称为 waitdecrementP 操作)
  • 释放release(也称 signalincrementV 操作)

acquire操作将信号量减 1,如果结果值为负则线程阻塞,且直到其他线程进行了信号量累加为正数才能恢复。如结果为正数,线程则继续执行。

release操作将信号量加 1,如存在被阻塞的线程,此时他们中的一个线程将解除阻塞。

Go 运行时提供的runtime_SemacquireMutexruntime_Semrelease函数可用来实现sync.RWMutex互斥锁。

锁 Lock

实现源码:

func (rw *RWMutex) Lock() {
    ...
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {     
        runtime_SemacquireMutex(&rw.writerSem, false)
    }
    ...
}

writer 通过Lock方法获取共享数据的独占权限。首先,它会申请阻塞其他写操作的互斥锁(rw.w.Lock()),此互斥锁在Unlock函数的最后才会进行解锁。下一步,将readerCount减去rwmutexMaxreader(值为 1 左移 30 位, 1<<30)使其为负数。当readerCount变为负数时,Rlock 将阻塞接下来的所有读请求。

再回过头来看下Rlock()函数中逻辑:

if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.    
    runtime_SemacquireMutex(&rw.SeadeSem, false)
}

后续的 reader 将会被阻塞,那么已运行的 reader 会怎样呢?readerWait字段用来记录当前 reader 执行的数量。writer 被信号量writerSem阻塞,直到最后一个 reader 在使用后面讨论的RUnlock方法解锁后会把writerSem加 1,此时信号量将变成 0,writer被解除阻塞(译者注:RUnlock 函数中的runtime_Semrelease(&rw.writerSem, false)

如果没有有效的 reader,那么 writer 将继续其执行。

最大 reader 数 rwmutexMaxreader

rwmutex.go中定义的常量:

const rwmutexMaxreader = 1 << 30

那么,其用途是什么,以及1<<30表示什么意义呢?

readerCount字段是int32类型,其范围为:

[-1 << 31, (1 << 31) — 1] or [-2147483648, 2147483647]

RWMutext使用此字段来计算挂起的 reader 和 writer 的标记(置为负数)。在Lock方法中:

r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader

Lock会将readerCount字段减去1<<30,当readerCount负值时表示 writer 调用了Lock正等待被处理,atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders) + rwmutexMaxreaders这个操作既让readerCount变为负数又使r存储回了 readerCount。rwmutexMaxreaders也可以限制被挂起 reader 的数量。如果有rwmutexMaxreader个或更多挂起的 reader,那么readerCount将是非负值,此时将导致整个机制的崩溃。所以,reader 实际的限制数是:rwmutexMaxreader - 1,此值1073741823超过了10亿

解读锁 RUnlock

实现源码:

func (rw *RWMutex) RUnlock() {
    ...
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        if r+1 == 0 || r+1 == -rwmutexMaxreader {
            race.Enable()
            thrSw("sync: RUnlock of unlocked RWMutex")
        }
        // A writer is pending.
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            // The last reader unblocks the writer.       
            runtime_Semrelease(&rw.WriteSem, false)
        }
    }
    ...
}

每次调用此方法将使readerCount减 1(RLock 方法中增加 1)。如果减完后readerCount值为负,则表示当前存在 writer 正在等待或运行。这是因为在Lock()方法中readerCount减去了rwmutexMaxreader。然后,当检查到将完成的 reader 数量(readerWait 数值)最终为 0 时,则表示 writer 可以最终申请信号量。(译者注:r < 0时,存在两个分支,当走 r+1 == 0 的分支时,表示 readerCount 此时为 0 即没有 RLock,所以 throw 了。当走下面那个分支时,r < 0则是因为存在 writer 把 readerCount 置为了负数在等待 reader 结束,那么当最后一个 reader 解锁时需要将 WriteSem 信号量加 1,唤醒 writer)

解锁 Unlock

实现源码:

func (rw *RWMutex) Unlock() {
    ...
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxreader)
    if r >= rwmutexMaxreader {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    rw.w.Unlock()
    ...
}

解锁被 writer 持有的互斥锁时,首先通过atomic.AddInt32readerCount加上rwmutexMaxreader,这时readerCount将变成非负值。如readerCount比 0 大,则表示存在 reader 正在等待 writer 执行完成,此时应唤醒这些等待的 reader。之后写锁将被释放,从而允许其他 writer 为了写入而锁定互斥锁。(译者注:如果还存在挂起的 reader,则在 writer 解锁之前需要通过信号量 readerSem 唤醒这些 reader 执行)

如果 reader 或 writer 尝试解锁未锁定的互斥锁时,调用UnlockRunlock方法将抛出错误(示例源码)。

m := sync.RWMutex{}
m.Unlock()

输出:

fatal error: sync: Unlock of unlocked RWMutex
...

递归读锁定 Recursive read locking

文档描述:

如果一个 reader goroutine 持有了读锁,而此时另一个 writer goroutine 调用Lock申请加写锁,此后在最初的读锁被释放前其他 goroutine 不能获取到读锁。特定情况下,这能防止递归读锁,这种策略保证了锁的可用性,Lock的调用会阻止其他新的 reader 来获得锁。

RWMutex 的工作方式是,如果有一个 writer 调用了 Lock,则所有调用 RLock 都将被锁定,无论是否已经获得了读锁定(示例源码): 示例代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

var m sync.RWMutex

func f(n int) int {
    if n < 1 {
        return 0
    }
    fmt.Println("RLock")
    m.RLock()
    defer func() {
        fmt.Println("RUnlock")
        m.RUnlock()
    }()
    time.Sleep(100 * time.Millisecond)
    return f(n-1) + n
}

func main() {
    done := make(chan int)
    go func() {
        time.Sleep(200 * time.Millisecond)
        fmt.Println("Lock")
        m.Lock()
        fmt.Println("Unlock")
        m.Unlock()
        done <- 1
    }()
    f(4)
    <-done
}

输出:

RLock
RLock
RLock
Lock
RLock
fatal error: all goroutines are asleep - deadlock!

译者注(至下一节以前均为译者注):为什么会发送死锁呢?原作者用递归函数在 defer 里面解锁,那么在加第三层读锁的时候,还没有读锁解锁。这时,readCount 是 3,此时正好加了一个 Lock 写锁,由于 readCount 是 3

if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
         runtime_Semacquire(&rw.writerSem)
        }

由上可知,此时 writer 需要等待所有进行中的 reader 完成,此时又调用了 RLock,

if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_Semacquire(&rw.readerSem)
}

由于在第四个 RLock 前,加了 Lock 操作,使得 readerCount 为负数。所以就造成了死锁,即 reader 在等待 readerSem,writer 在等待 writerSem*

复制锁 Copying locks

go tool vet可以检测锁是否被复制了,因为复制锁会导致死锁。更多关于此问题可参考之前的文章:Detect locks passed by value in Go

性能 Performance

之前有人发现,在 CPU 核数增多时 RWMutex 的性能会有下降,详见:sync: RWMutex scales poorly with CPU count

争用 Contention

Go 版本 ≥ 1.8 之后,支持分析争用的互斥锁(runtime: Profile goroutines holding contended mutexes.)。我们来看下如何做:

package main

import (
    "net/http"
    _ "net/http/pprof"
    "runtime"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    runtime.SetMutexProfileFraction(5)
    for i := 0; i < 10; i++ {
        go func() {
            for {
                mu.Lock()
                time.Sleep(100 * time.Millisecond)
                mu.Unlock()
            }
        }()
    }
    http.ListenAndServe(":8888", nil)
}
> go build mutexcontention.go
> ./mutexcontention

mutexcontention程序运行时,执行 pprof:

> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list main
Total: 57.28s
ROUTINE main.main.func1 in .../src/github.com/mlowicki/mutexcontention/mutexcontention.go
0     57.28s (flat, cum)   100% of Total
.          .     14:   for i := 0; i < 10; i++ {
.          .     15:           go func() {
.          .     16:                   for {
.          .     17:                           mu.Lock()
.          .     18:                           time.Sleep(100 * time.Millisecond)
.     57.28s     19:                           mu.Unlock()
.          .     20:                   }
.          .     21:           }()
.          .     22:   }
.          .     23:
.          .     24:   http.ListenAndServe(":8888", nil)

注意,为什么这里耗时 57.28s,且指向了mu.Unlock()呢?

当 goroutine 调用Lock而阻塞时,会记录当前发生的准确时间--叫做acquiretime。当另一个 groutine 解锁,至少存在一个 goroutine 在等待获得锁,则其中一个解除阻塞并调用其mutexevent函数。该mutexevent函数通过检查SetMutexProfileFraction设置的速率来决定此事件应被保留还是丢弃。此事件包含整个等待的时间(当前时间 - 获得时间)。从上面的例子可以看出,所有阻塞在特定互斥锁的 goroutines 的总等待时间会被收集和展示。

在 Go 1.11(sync: enable profiling of RWMutex)中将增加读锁(Rlock 和 RUnlock)的争用。

资料 Resources

备注:

  • critical section:临界区
  • Mutual exclusion,缩写 Mutex:互斥锁
已邀请:

astaxie - 创造、获取、分享、传播和应用Go

赞同来自: haoc7 marks_gui

这是我们GoCN翻译小组的第一篇文章,我们后续会努力每周发布两篇

marks_gui

赞同来自:

感谢 gocn

kevin - 杭州云柚科技

赞同来自:

Cool!

要回复问题请先登录注册