前两天看到了这个帖子,https://tanronggui.xyz/t/547045#reply53 感觉其中描述的问题很有意思,我就用 Go 把它提到的解决方案都实现了一遍,文章链接:一条面试题引发的关于 Go 语言中锁的思考,还请大家多多指教!
前两天在 V2EX 上看到了一篇博文,《一条经典面试题引发的思考》,感觉很有意思,我就用 Go 把它的答案实现了一遍。
问题如下:
编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....
一个错误解法如下:
1 package main
2
3 import (
4 "fmt"
5 "sync"
6 )
7
8 var mu sync.Mutex
9 var index int
10 var endSignal = make(chan struct{})
11
12 func echoRoutine(routineIndex int, routineName string) {
13 for index < 30 {
14 mu.Lock()
15 if index%3 == routineIndex {
16 fmt.Println(index, routineName)
17 index++
18 }
19 mu.Unlock()
20 }
21
22 endSignal <- struct{}{}
23 }
24
25 func main() {
26 routineNames := []string{"A", "B", "C"}
27
28 for idx, name := range routineNames {
29 go echoRoutine(idx, name)
30 }
31
32 for _ = range routineNames {
33 <-endSignal
34 }
35 //Output:
36 //0 A
37 //1 B
38 //2 C
39 //3 A
40 //4 B
41 //5 C
42 //6 A
43 //7 B
44 //8 C
45 //9 A
46 //10 B
47 //11 C
48 //12 A
49 //13 B
50 //14 C
51 //15 A
52 //16 B
53 //17 C
54 //18 A
55 //19 B
56 //20 C
57 //21 A
58 //22 B
59 //23 C
60 //24 A
61 //25 B
62 //26 C
63 //27 A
64 //28 B
65 //29 C
66 //30 A
67 //31 B
68 }
从上面的输出可以看到,程序还额外输出了两次 A 和 B。首先说结论,这是因为检查条件index < 30
没有加锁。为了理解这个错误,首先我们需要了解一下 Go 的内存模型。
首先说什么是 Go 的内存模型,在官方文档中是这样定义的:
The Go memory model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine.
Go 语言的内存模型规定了一个 Goroutine 可以看见另外一个 Goroutine 修改同一个变量的值的条件,这类似于 Java 内存模型中 内存可见性 问题。
在 Go 语言中,编译器和处理器会对执行的指令进行重排序。Go 语言会保证的是,在单个 Goroutine 内,重排序后的执行效果和未进行重排序(即在代码中定义的执行顺序)的执行效果是一样。但是在多个 Goroutine 的运行环境下,由于重排序的原因,一个 Goroutine 观察到的执行顺序可能和另外一个 Goroutine 观察到的不一样。例如一个 Goroutine 执行a = 1; b = 2
,另一个 Goroutine 可能会在a
被赋值之前先观察到b
被赋值了。
为了规范读和写的操作,Go 定义了 happens before 原则。对于变量读写操作,我们可以将其称作是事件。事件之间的顺序可以定义为三种,E1 发生在 E2 之前,E1 发生在 E2 之后,E1 和 E2 同时发生。
在单个 Goroutine 内,happens before 顺序就是程序执行的顺序,如果下面两个条件都被满足的话,变量v
的读取事件r
,可以观察到变量v
的写入事件w
。
r
没有在w
之前发生。w
之后,r
之前,没有另外一个变量v
的写入事件w'
发生。总的来说,在单个 Goroutine 的环境下,读事件r
会观察到最近的一个写事件w
。
在多个 Goroutine 的运行环境下,如果需要让v
的读取事件r
观察到其写入事件w
。需要满足更严格的条件:
w
发生在r
之前v
的写入事件都发生在w
之前或者r
之后。因为在多个 Goroutine 的运行环境下,读写事件可能并行地发生,所以第一点更严格了一些,要求w
必须在r
之前发生,两者不能同时发生,且它们之间没有其他的写事件w'
。
因此在多个 Goroutine 访问一个共享变量v
的时候,我们必须使用同步事件去建立一个 happens before 条件来让读事件观察到目标写事件。
此外,还需要注意的是:
v
发生写事件之前,它被初始化成对应类型的零值。了解了 Go 内存模型中的 Happens Before 原则之后,我们接着再来分析上述的错误代码。
i++
的写操作和 A 和 B 执行第 13 行index < 30
读操作是并行发生的,所以 A 和 B 可能读取到的值是 29 并进入循环。index
的值,所以最后多输出的结果是 30 和 31。理解了这个错误之后,我们可以把代码稍微改一下,将index < 30
这个比较操作也置于锁的保护中,就能够得到正确的结果了。
package main
import (
"fmt"
"sync"
)
var i int
var mu sync.Mutex
var endSignal = make(chan struct{})
func threadPrint(threadNum int, threadName string) {
for {
mu.Lock()
if i >= 30 {
mu.Unlock()
break
}
if i%3 == threadNum {
fmt.Printf("%d: %s\n", i, threadName)
i++
}
mu.Unlock()
}
endSignal <- struct{}{}
}
func main() {
names := []string{"A", "B", "C"}
for idx, name := range names {
go threadPrint(idx, name)
}
for _ = range names {
<-endSignal
}
}
上述代码是得到的结果是正确的,但是还存在一些问题。我们想要的执行顺讯是有序的,
但每次解锁的时候,都是 A, B, C 三个 Goroutine 上去抢锁资源。有时候某个 Goroutine 抢到了锁资源,但是其并不符合执行要求(i%3 != threadNum
)。它就会将锁释放,然后让大家重新再来抢。
这样的争抢索锁资源造成了时间上的浪费,执行了一些不必要的操作。
为了避免这些浪费,我们可以参考 Java 中的公平锁,让解锁的顺序和上锁的顺序匹配,每次只有一个 Goroutine 获得锁资源,大家不必每次都去争抢锁资源。
package main
import (
"fmt"
"log"
"sync"
)
type FailLock struct {
mu *sync.Mutex
cond *sync.Cond
holdCount int
}
func NewFailLock() sync.Locker {
mu := new(sync.Mutex)
cond := sync.NewCond(mu)
return &FailLock{
holdCount: 0,
mu: mu,
cond: cond,
}
}
func (fl *FailLock) Lock() {
fl.mu.Lock()
defer fl.mu.Unlock()
fl.holdCount++
if fl.holdCount == 1 {
return
}
fl.cond.Wait()
}
func (fl *FailLock) Unlock() {
fl.mu.Lock()
defer fl.mu.Unlock()
if fl.holdCount == 0 {
log.Fatal("unlock of UnLocked mutex")
}
fl.holdCount--
if fl.holdCount != 0 {
fl.cond.Signal()
}
}
var (
end = make(chan struct{})
i int
)
func threadPrint(threadNum int, threadName string, mu sync.Locker) {
for i < 30 {
mu.Lock()
if i >= 30 {
mu.Unlock()
continue
}
if i < 3 && i%3 != threadNum {
mu.Unlock()
continue
}
fmt.Printf("%d: %s\n", i, threadName)
i += 1
mu.Unlock()
}
end <- struct{}{}
}
func main() {
mu := NewFailLock()
names := []string{"A", "B", "C"}
for idx, name := range names {
go threadPrint(idx, name, mu)
}
for _ = range names {
<-end
}
}
上述代码中需要注意两点:
A,B,C
这样的顺序,所以需要在64~67
行加一个判断,程序刚开始执行时如果获得的锁不对,就不执行任何操作,重新获得锁。60~63
行上锁之后加一个判断,保证i
的值是最新的值。除了自己手动加锁外,我们也可以使用 Go 的 atomic
包中提供的原子操作来完成上述功能。
每个 Goroutine 原子性地获得i
的值,如果符合i % 3 == threadNum
的条件,则执行操作,否则作自旋。代码如下:
package main
import (
"fmt"
"sync/atomic"
)
var (
end = make(chan struct{})
i int32
)
func threadPrint(threadNum int32, threadName string) {
for {
v := atomic.LoadInt32((*int32)(&i))
if v >= 30 {
break
}
if v%3 == threadNum {
fmt.Printf("%d: %s\n", i, threadName)
atomic.AddInt32((*int32)(&i), int32(1))
} else {
continue
}
}
end <- struct{}{}
}
func main() {
names := []string{"A", "B", "C"}
for idx, name := range names {
go threadPrint(int32(idx), name)
}
for _ = range names {
<-end
}
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.