线程同步操作面试题使用锁的解法
注意: 这篇文章的思路是不正确的,正确的思路请参考 关于线程同步操作的一道面试题
前言
前两天在 V2ex 上看到了一篇博文,《一条经典面试题引发的思考》,感觉很有意思,我就用 Go 把它的答案实现了一遍。
问题描述及一个错误解法
问题如下:
编写一个程序,开启 3 个线程A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC….
一个错误解法如下:
|
|
从上面的输出可以看到,程序还额外输出了两次 A 和 B。首先说结论,这是因为检查条件index < 30
没有加锁。为了理解这个错误,首先我们需要了解一下 Go 的内存模型。
Go 语言的内存模型
首先说什么是 Go 的内存模型,在官方文档中是这样定义的:
The Go memory model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine.
Go 语言的内存模型规定了一个 Goroutine 可以看见另外一个 Goroutine 修改同一个变量的值的条件,这类似于 Java 内存模型中 内存可见性 问题。
Happens Before 原则
在 Go 语言中,编译器和处理器会对执行的指令进行重排序。Go 语言会保证的是,在单个 Goroutine 内,重排序后的执行效果和未进行重排序(即在代码中定义的执行顺序)的执行效果是一样。但是在多个 Goroutine 的运行环境下,由于重排序的原因,一个 Goroutine 观察到的执行顺序可能和另外一个 Goroutine 观察到的不一样。例如一个 Goroutine 执行a = 1; b = 2
,另一个 Goroutine 可能会在a
被赋值之前先观察到b
被赋值了。
为了规范读和写的操作,Go 定义了 happens before 原则。对于变量读写操作,我们可以将其称作是事件。事件之间的顺序可以定义为三种,E1 发生在 E2 之前,E1 发生在 E2 之后,E1 和 E2同时发生。
在单个 Goroutine 内,happens before 顺序就是程序执行的顺序,如果下面两个条件都被满足的话,变量v
的读取事件r
,可以观察到变量v
的写入事件w
。
r
没有在w
之前发生。- 在
w
之后,r
之前,没有另外一个变量v
的写入事件w'
发生。
总的来说,在单个 Goroutine 的环境下,读事件r
会观察到最近的一个写事件w
。
在多个 Goroutine 的运行环境下,如果需要让v
的读取事件r
观察到其写入事件w
。需要满足更严格的条件:
w
发生在r
之前- 任何针对共享变量
v
的写入事件都发生在w
之前或者r
之后。
因为在多个 Goroutine 的运行环境下,读写事件可能并行地发生,所以第一点更严格了一些,要求w
必须在r
之前发生,两者不能同时发生,且它们之间没有其他的写事件w'
。
因此在多个 Goroutine 访问一个共享变量v
的时候,我们必须使用同步事件去建立一个 happens before 条件来让读事件观察到目标写事件。
此外,还需要注意的是:
- 在变量
v
发生写事件之前,它被初始化成对应类型的零值。 - 大于一个机器字的变量的读写事件,会表现成 未指定顺序 的多次机器字大小的读写操作。
正确答案 V1
了解了 Go 内存模型中的 Happens Before 原则之后,我们接着再来分析上述的错误代码。
- 在 B Goroutine 输出完28之后,A, B, C 都会再循环了一次。
- 接着会由 C 先来执行输出操作。由于 C 执行第17行
i++
的写操作和 A 和 B 执行第13行index < 30
读操作是并行发生的,所以 A 和 B 可能读取到的值是29并进入循环。 - 因为此时 A 和 B 都已经进入循环了,所以会出现多输出一次的情况。
- A 和 B 进入循环后,由于锁的存在,它们能够获取到正确的
index
的值,所以最后多输出的结果是30和31。
理解了这个错误之后,我们可以把代码稍微改一下,将index < 30
这个比较操作也置于锁的保护中,就能够得到正确的结果了。
package main
import (
"fmt"
"sync"
)
var i int
var mu sync.Mutex
var endSignal = make(chan struct{})
func threadPrint(threadNum int, threadName string) {
for {
mu.Lock()
if i >= 30 {
mu.Unlock()
break
}
if i%3 == threadNum {
fmt.Printf("%d: %s\n", i, threadName)
i++
}
mu.Unlock()
}
endSignal <- struct{}{}
}
func main() {
names := []string{"A", "B", "C"}
for idx, name := range names {
go threadPrint(idx, name)
}
for _ = range names {
<-endSignal
}
}
正确答案 V2 – 公平锁
上述代码是得到的结果是正确的,但是还存在一些问题。我们想要的执行顺讯是有序的,
但每次解锁的时候,都是 A, B, C 三个 Goroutine 上去抢锁资源。有时候某个 Goroutine 抢到了锁资源,但是其并不符合执行要求(i%3 != threadNum
)。它就会将锁释放,然后让大家重新再来抢。
这样的争抢索锁资源造成了时间上的浪费,执行了一些不必要的操作。
为了避免这些浪费,我们可以参考 Java 中的公平锁,让解锁的顺序和上锁的顺序匹配,每次只有一个 Goroutine 获得锁资源,大家不必每次都去争抢锁资源。
package main
import (
"fmt"
"log"
"sync"
)
const endNum = 30
type FairLock struct {
mu *sync.Mutex
cond *sync.Cond
isHold bool
holdCount int
}
func NewFairLock() sync.Locker {
mu := new(sync.Mutex)
cond := sync.NewCond(mu)
return &FairLock{
holdCount: 0,
isHold: false,
mu: mu,
cond: cond,
}
}
func (fl *FairLock) Lock() {
fl.mu.Lock()
defer fl.mu.Unlock()
if !fl.isHold {
fl.holdCount++
fl.isHold = true
return
}
fl.holdCount++
for fl.isHold {
fl.cond.Wait()
}
fl.isHold = true
}
func (fl *FairLock) Unlock() {
fl.mu.Lock()
defer fl.mu.Unlock()
if !fl.isHold {
log.Fatal("unlock of UnLocked mutex")
}
if fl.holdCount > 1 {
fl.cond.Signal()
}
fl.isHold = false
fl.holdCount--
}
var (
end = make(chan struct{})
i int
)
func threadPrint(threadNum int, threadName string, locker sync.Locker) {
for i < endNum {
locker.Lock()
if i >= endNum {
locker.Unlock()
continue
}
if i%3 != threadNum {
locker.Unlock()
continue
}
fmt.Printf("%d: %s\n", i, threadName)
i += 1
locker.Unlock()
}
end <- struct{}{}
}
func main() {
mu := NewFairLock()
names := []string{"A", "B", "C"}
for idx, name := range names {
go threadPrint(idx, name, mu)
}
for _ = range names {
<-end
}
}
注意: 由于可见性的原因,需要在70~72
行上锁之后加一个判断,保证i
的值是最新的值。
经 V友 @hheedat 的提醒,cond.Wait
操作需要放在 for
循环中,因为阻塞的 Goroutine 可能会被误唤醒。
Go 的文档中也说明 Wait
需要放在for
循环中。因为在 Wait
的过程中,cond
相关的锁并没有上锁,所以Wait
唤醒后,相关的条件不一定是正确的。
公平锁的一个错误尝试
在之前的代码中,我尝试利用公平锁上锁和解锁的有序性,来让3个 Goroutine 顺序执行并按顺序输出A,B,C
。后经V友 hheedat 指出了错误,发现这样的想法是不可行的。因为以下两点是冲突的:
- 让 Goroutine 在无法获得锁资源的时候阻塞
- 让 Goroutine 按照
A,B,C
的顺序上锁
- 因为 Goroutine 的启动时间和接收到信号量的时间都是无序的,所以正常情况下无法让 Goroutine 按顺序上锁。
- 我尝试让 Goroutine 上锁之后使用一个信号通知
main
Goroutine,让main
Goroutine 再去通知下一个 Goroutine 去上锁。 - 但由于 Goroutine 上锁之后是阻塞的,所以使用信号通知
main
Goroutine 的方案也是不可行的。
或许还有其他方案,但我认为这样思考的思路是不对的,这个同步问题本来就应该使用信号量来解决,使用锁并尝试按顺序上锁只是在歧路上越走越远。
参考链接
- 一条经典面试题引发的思考
- Memory barrier – Wikipedia
- 什么是Java中的公平锁?
- The Go Memory Model
- GoLang内存模型
- go reentrant lock(可重入锁) 简单实现
- Go Concurrency Patterns: Pipelines and cancellation
- Fan-In Wikipedia
- V友 Mark3K的评论
- 一条面试题引发的思考 Go 版本