线程同步操作面试题使用锁的解法

注意: 这篇文章的思路是不正确的,正确的思路请参考 关于线程同步操作的一道面试题

前言

前两天在 V2ex 上看到了一篇博文,《一条经典面试题引发的思考》,感觉很有意思,我就用 Go 把它的答案实现了一遍。

问题描述及一个错误解法

问题如下:

编写一个程序,开启 3 个线程A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC….

一个错误解法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
	"fmt"
	"sync"
)

var mu sync.Mutex
var index int
var endSignal = make(chan struct{})

func echoRoutine(routineIndex int, routineName string) {
	for index < 30 {
		mu.Lock()
		if index%3 == routineIndex {
			fmt.Println(index, routineName)
			index++
		}
		mu.Unlock()
	}

	endSignal <- struct{}{}
}

func main() {
	routineNames := []string{"A", "B", "C"}

	for idx, name := range routineNames {
		go echoRoutine(idx, name)
	}

	for _ = range routineNames {
		<-endSignal
	}
	//Output:
	//0 A
	//1 B
	//2 C
	//3 A
	//4 B
	//5 C
	//6 A
	//7 B
	//8 C
	//9 A
	//10 B
	//11 C
	//12 A
	//13 B
	//14 C
	//15 A
	//16 B
	//17 C
	//18 A
	//19 B
	//20 C
	//21 A
	//22 B
	//23 C
	//24 A
	//25 B
	//26 C
	//27 A
	//28 B
	//29 C
	//30 A
	//31 B
}

从上面的输出可以看到,程序还额外输出了两次 A 和 B。首先说结论,这是因为检查条件index < 30 没有加锁。为了理解这个错误,首先我们需要了解一下 Go 的内存模型。

Go 语言的内存模型

首先说什么是 Go 的内存模型,在官方文档中是这样定义的:

The Go memory model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine.

Go 语言的内存模型规定了一个 Goroutine 可以看见另外一个 Goroutine 修改同一个变量的值的条件,这类似于 Java 内存模型中 内存可见性 问题。

Happens Before 原则

在 Go 语言中,编译器和处理器会对执行的指令进行重排序。Go 语言会保证的是,在单个 Goroutine 内,重排序后的执行效果和未进行重排序(即在代码中定义的执行顺序)的执行效果是一样。但是在多个 Goroutine 的运行环境下,由于重排序的原因,一个 Goroutine 观察到的执行顺序可能和另外一个 Goroutine 观察到的不一样。例如一个 Goroutine 执行a = 1; b = 2,另一个 Goroutine 可能会在a被赋值之前先观察到b被赋值了。

为了规范读和写的操作,Go 定义了 happens before 原则。对于变量读写操作,我们可以将其称作是事件。事件之间的顺序可以定义为三种,E1 发生在 E2 之前,E1 发生在 E2 之后,E1 和 E2同时发生。

在单个 Goroutine 内,happens before 顺序就是程序执行的顺序,如果下面两个条件都被满足的话,变量v的读取事件r,可以观察到变量v的写入事件w

  1. r没有在w之前发生。
  2. w之后,r之前,没有另外一个变量v的写入事件w'发生。

总的来说,在单个 Goroutine 的环境下,读事件r会观察到最近的一个写事件w

在多个 Goroutine 的运行环境下,如果需要让v的读取事件r观察到其写入事件w。需要满足更严格的条件:

  1. w发生在r之前
  2. 任何针对共享变量v的写入事件都发生在w之前或者r之后。

因为在多个 Goroutine 的运行环境下,读写事件可能并行地发生,所以第一点更严格了一些,要求w必须在r之前发生,两者不能同时发生,且它们之间没有其他的写事件w'。 因此在多个 Goroutine 访问一个共享变量v的时候,我们必须使用同步事件去建立一个 happens before 条件来让读事件观察到目标写事件。

此外,还需要注意的是:

  1. 在变量v发生写事件之前,它被初始化成对应类型的零值。
  2. 大于一个机器字的变量的读写事件,会表现成 未指定顺序 的多次机器字大小的读写操作。

正确答案 V1

了解了 Go 内存模型中的 Happens Before 原则之后,我们接着再来分析上述的错误代码。

  • 在 B Goroutine 输出完28之后,A, B, C 都会再循环了一次。
  • 接着会由 C 先来执行输出操作。由于 C 执行第17行i++的写操作和 A 和 B 执行第13行index < 30读操作是并行发生的,所以 A 和 B 可能读取到的值是29并进入循环。
  • 因为此时 A 和 B 都已经进入循环了,所以会出现多输出一次的情况。
  • A 和 B 进入循环后,由于锁的存在,它们能够获取到正确的index的值,所以最后多输出的结果是30和31。

理解了这个错误之后,我们可以把代码稍微改一下,将index < 30这个比较操作也置于锁的保护中,就能够得到正确的结果了。

package main

import (
	"fmt"
	"sync"
)

var i int
var mu sync.Mutex
var endSignal = make(chan struct{})

func threadPrint(threadNum int, threadName string) {
	for {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			break
		}

		if i%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			i++
		}
		mu.Unlock()
	}

	endSignal <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name)
	}

	for _ = range names {
		<-endSignal
	}
}

正确答案 V2 – 公平锁

上述代码是得到的结果是正确的,但是还存在一些问题。我们想要的执行顺讯是有序的, 但每次解锁的时候,都是 A, B, C 三个 Goroutine 上去抢锁资源。有时候某个 Goroutine 抢到了锁资源,但是其并不符合执行要求(i%3 != threadNum)。它就会将锁释放,然后让大家重新再来抢。

这样的争抢索锁资源造成了时间上的浪费,执行了一些不必要的操作。

为了避免这些浪费,我们可以参考 Java 中的公平锁,让解锁的顺序和上锁的顺序匹配,每次只有一个 Goroutine 获得锁资源,大家不必每次都去争抢锁资源。

package main

import (
	"fmt"
	"log"
	"sync"
)

const endNum = 30

type FairLock struct {
	mu        *sync.Mutex
	cond      *sync.Cond
	isHold    bool
	holdCount int
}

func NewFairLock() sync.Locker {
	mu := new(sync.Mutex)
	cond := sync.NewCond(mu)

	return &FairLock{
		holdCount: 0,
		isHold:    false,
		mu:        mu,
		cond:      cond,
	}
}

func (fl *FairLock) Lock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	if !fl.isHold {
		fl.holdCount++
		fl.isHold = true
		return
	}

	fl.holdCount++
	for fl.isHold {
		fl.cond.Wait()
	}
	fl.isHold = true
}

func (fl *FairLock) Unlock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	if !fl.isHold {
		log.Fatal("unlock of UnLocked mutex")
	}

	if fl.holdCount > 1 {
		fl.cond.Signal()
	}
	fl.isHold = false
	fl.holdCount--
}

var (
	end = make(chan struct{})
	i   int
)

func threadPrint(threadNum int, threadName string, locker sync.Locker) {
	for i < endNum {
		locker.Lock()
		if i >= endNum {
			locker.Unlock()
			continue
		}
		if i%3 != threadNum {
			locker.Unlock()
			continue
		}

		fmt.Printf("%d: %s\n", i, threadName)
		i += 1
		locker.Unlock()
	}
	end <- struct{}{}
}

func main() {
	mu := NewFairLock()
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name, mu)
	}

	for _ = range names {
		<-end
	}
}

注意: 由于可见性的原因,需要在70~72行上锁之后加一个判断,保证i的值是最新的值。

经 V友 @hheedat 的提醒cond.Wait操作需要放在 for 循环中,因为阻塞的 Goroutine 可能会被误唤醒。

Go 的文档中也说明 Wait需要放在for循环中。因为在 Wait 的过程中,cond 相关的锁并没有上锁,所以Wait唤醒后,相关的条件不一定是正确的。

公平锁的一个错误尝试

在之前的代码中,我尝试利用公平锁上锁和解锁的有序性,来让3个 Goroutine 顺序执行并按顺序输出A,B,C。后经V友 hheedat 指出了错误,发现这样的想法是不可行的。因为以下两点是冲突的:

  1. 让 Goroutine 在无法获得锁资源的时候阻塞
  2. 让 Goroutine 按照 A,B,C 的顺序上锁
  • 因为 Goroutine 的启动时间和接收到信号量的时间都是无序的,所以正常情况下无法让 Goroutine 按顺序上锁。
  • 我尝试让 Goroutine 上锁之后使用一个信号通知mainGoroutine,让mainGoroutine 再去通知下一个 Goroutine 去上锁。
  • 但由于 Goroutine 上锁之后是阻塞的,所以使用信号通知mainGoroutine 的方案也是不可行的。

或许还有其他方案,但我认为这样思考的思路是不对的,这个同步问题本来就应该使用信号量来解决,使用锁并尝试按顺序上锁只是在歧路上越走越远

参考链接

2019年03月26日 / 21:56