关于线程同步操作的一道面试题

前言

前两天在 V2EX 上发现了一道好玩的面试题,兴致冲冲地写了答案出来,并发到了 V2EX 上。结果发现自己实现的思路完全是错误的,遂把上篇文章推翻,重新写一篇。

问题描述及解析

问题描述

编写一个程序,开启 3 个线程A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC….

为了能够踩到更多的坑,我把上述问题升级了一下,线程数量和打印次数不是一个固定的值,具体如下:

  • 编写一个程序,开启 N 个线程A,B,C…,这N个线程的输出分别为 A、B、C…,每个线程将自己的输出在屏幕上打印 M 遍,要求输出的结果必须按顺序显示。如:ABC…ABC…ABC…
  • 其中 N <= 1000, M <= 1000

注意:

  • 输出要在各自的线程中输出,不能在主线程中输出

题目解析

引用自 V友 @hjc4869

这个问题本质上是在实现同步,而不是互斥。同步强调的是做事的先后顺序而不是多线程访问资源的冲突。虽然二者是包含关系并且可以互相实现,但是如果这里第一眼看到题就只是想到用 lock/mutex 而不是 semaphore/channel 去实现,那么显然是对后者的应用不熟,面试要挂的

利用 Channel 做信号量的解法

使用信号量的话,这个题的解题思路就很简单:

  • 创建 N 个Goroutine 执行输出操作。
  • 每个 Goroutine 的具体操作可用以下伪代码来表示:
def echo(threadNum, Upstream, Downstream):
  for i in range(M):
    wait Upstream  // 等待上游的信号
    print(threadNum)
    signal Downstream // 给下游发送信号

其中UpstreamDownstream 都表示信号量,A Goroutine 的 DownstreamB Goroutine 的 Upstream,依此类推,所有 Goroutine 会形成一个链式的关系。

可以使用下图来表示:

具体代码如下:

package main

import (
	"fmt"
	"log"
)

var (
	N = 5
	M = 2
)

func main() {
	var wait, sig, firstWait, lastSig chan struct{}

	wait = make(chan struct{})
	firstWait = wait

	for i := 0; i < N; i++ {
		sig = make(chan struct{})
		lastSig = sig
		go echo(i, wait, sig)
		wait = sig
	}

	for i := 0; i < M; i++ {
		firstWait <- struct{}{}
		<-lastSig
	}
	close(firstWait)

	_, ok := <-lastSig
	if ok {
		log.Fatalln("Channel not closed")
	}
	// Out
	// 0: A
	// 1: B
	// 2: C
	// 3: D
	// 4: E
	// 0: A
	// 1: B
	// 2: C
	// 3: D
	// 4: E
	// 0: A
	// 1: B
	// 2: C
	// 3: D
	// 4: E
	// Close A
	// Close B
	// Close C
	// Close D
	// Close E
}

func echo(threadNum int, wait chan struct{}, sig chan struct{}) {
	threadName := string('A' + threadNum)

	for _ = range wait {
		fmt.Printf("%d: %s\n", threadNum, threadName)
		sig <- struct{}{}
	}

	close(sig)
	// 这句是我打印出来为了确认所有的 Goroutine 已经关闭了,实际不需要
	fmt.Println("Close", threadName) 
}

FanIn 的方式

根据题目要求来看,在主线程中输出结果,有些不符合要求,但有个答案的实现很有意思,我就也放上来了

经V友 @Mark3K 的补充,还可以使用多个 channel 执行扇入(Fan In)操作,避免使用锁。

首先说一下扇入的定义,Go blog 中是这样描述的:

A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.

通过将多个输入 channel 多路复用到单个处理 channel 的方式,一个函数能够从多个输入 channel 中读取数据并处理。当所有的输出 channel 都关闭的时候,单个处理 channel 也会关闭。这就叫做扇入。

在维基百科中描述的逻辑门的扇入如下(大家也可以参考这个来理解 Go 中的扇入):

Fan-in is the number of inputs a logic gate can handle. For instance the fan-in for the AND gate shown in the figure is 3. Physical logic gates with a large fan-in tend to be slower than those with a small fan-in. This is because the complexity of the input circuitry increases the input capacitance of the device. Using logic gates with higher fan-in will help reducing the depth of a logic circuit.

逻辑门中的扇入定义: 一个逻辑门将多个输入处理成一个输出,它能够处理的输入数量就叫做扇入。

理解了扇入的概念后,上述问题的答案也呼之欲出了。我们可以为A,B,C,...这N个 Goroutine 创建N个 channel。然后通过一个 FanIn 函数将N个 channel 的输出输入到一个 channel 中。具体代码如下:

package main

import "fmt"

var (
	N = 5
	M = 5
)

func gen(v string, times int) <-chan string {
	ch := make(chan string)
	go func() {
		defer close(ch)
		for i := 0; i < times; i++ {
			ch <- v
		}
	}()
	return ch
}

func fanIn(times int, inputs []<-chan string) <-chan string {
	ch := make(chan string)
	go func() {
		defer close(ch)
		for i := 0; i < times; i++ {
			for _, input := range inputs {
				v := <-input
				ch <- v
			}
		}
	}()
	return ch
}

func main() {
	times := M
	inputs := make([]<-chan string, 0, N)
	for i := 0; i < N; i++ {
		threadName := string('A' + i)
		inputs = append(inputs, gen(threadName, times))
	}
	for char := range fanIn(times, inputs) {
		fmt.Println(char)
	}
}

使用锁的解决方案

使用锁并不是这个问题的正确解决思路,甚至会将人的思维带入歧途,所以不推荐大家使用锁解决。

这是我之前写的旧文,线程同步操作面试题使用锁的解法,个人觉得某些地方还有一些参考价值,请大家酌情阅读。

使用 AtomicInteger 的解决方案

使用AtomicInteger并不是一种好的解决方案(因为这个解法让 CPU 持续空转),但是通过这个解法我们可以了解到 Go 调度器阻塞的一个陷阱,所以也把这种解法放了上来,感兴趣的朋友可以查看我的另一篇文章 Go 调度器的一个无法执行陷阱

2019年04月13日 / 17:27