理解golang调度之三:并发

简介

当我在解决一个问题尤其是新问题的时候,我开始不会去考虑并发(concurrency)是否合适。我首先会去找一系列的解决方式然后确保它有效。然后在可读性和技术方案评估之后,我会开始去考虑并发是否实际合理。有些时候并发的好处是显而易见的,但是有时候并不是很明显。
第一篇文章,我解释了OS调度器的相关内容,我觉得这部分对于你写多线程代码很重要。第二篇里,我讲解了一些Go调度器的一些内容,这部分对于你理解和写go的并发代码很有帮助。在这篇文章里,我会在OS和Go调度器层面让你去深层次的理解并发到底是什么。
这部分内容的目标是:

你的工作负载(workloads)使用并发是否合适,为此提供一些指导建议
不同工作负载的含义,并针对其作出相应的工程方面的决策。

什么是并发

并发的含义就是无序的执行。给你一系列的指令,去找到一个方式可以无序执行而且和有序执行产生同样的结果。这个问题在你面前,显而易见的是无序执行会增加一些足够的性能增益在计算了复杂性成本之后,但是你可能会觉得无序执行是不可能的甚至是没有意义的。
你也要清楚一点,并发和并行是不一样的。并行是在相同时间内同时执行两个或两个以上的指令,这和并发的概念不一样。
图3.1

图3.1里,你看到主机上有两个逻辑处理器。每个都有他们单独的OS线程(M)依附于一个独立的硬件线程(Core)。你可以看到2个Goroutine(G1和G2) 正在并行在各自的OS/硬件线程上面同时执行它们的指令。在每个逻辑处理器里,有3个Goroutines以轮转的方式共享OS线程。这些Goroutines正在以无序的方式并发地执行它们的指令,并且在OS线程上共享时间片。
这里有一个问题。有些时候利用并发而不采用并行实际上会降低你的吞吐量,有趣的是,有时候利用并发同时加上并行处理也不会为你带来你理想中的性能增益。

工作负载(workloads)

你是如何知道无序执行(并发)是可行的呢?了解你所处理问题的工作负载(workload)是一个起点。有两种类型的工作负载在并发的时候要考虑到。

  • CPU密集(CPU-Bound):这种工作负载情况不会有Goroutines自动切换到waiting状态的情况,也不会有自动从waiting状态切到其他状态的情况。这种情况发生在进行持续计算的时候。线程计算Pi值就是CPU-Bound。
  • IO密集(IO-Bound):这种工作负载会导致Goroutines自动进入等待状态。这种工作发生在持续地请求网络资源、或者是进行系统调用、或者是等待事件发生的情况。一个Goroutines需要读文件就是IO-Bound。我把同步事件(mutexes,atomic)类似导致Goroutine等待的情况归到此类。

cpu-bound的工作负载,你需要并行去使用并发。一个单独的OS/硬件线程处理多个Goroutines效率很低,因为Goroutines在这个工作负载里不会主动进入或者是离开等待状态。Goroutines数多于OS/硬件线程数的时候会降低工作负载的执行速度,因为从OS线程换上或者是换下Goroutines会有延迟(切换的时间)。上下文切换会在workload里创建出“一切都停止”事件,因为在切换的时候你的所有workload都不会执行。
在IO-Bound的workloads里,你不需要并行去使用并发。一个单独OS/硬件线程可以有效率地处理多个Goroutines,因为Goroutines作为它自己workload的一部分可以自动进入或者离开等待状态(waiting)。Goroutines数量多于OS/硬件线程数可以加速workload的执行,因为Goroutines在OS线程上切换不会创建“一切都停止”事件。你的workload会自然停止并且这会让一个不同的Goroutine去有效率地使用相同的OS/硬件线程,而不是让OS/硬件线程空闲下来。
你如何知道每个硬件线程设置多少个Goroutines会有最好的吞吐量呢?太少的Goroutines你会有更多空闲时间。太多Goroutines你会有更多上下文切换延迟。这件事情你需要考虑,但是这超出了
本篇文章讲述的范围。
现在,我们需要看一些代码来巩固你去判断什么时候workload可以利用并发,以及什么时候需要利用并行什么时候不需要并行。

整数累加

不需要太复杂的代码,就看一下下面的add函数。它计算了一堆整数的和。
L1

1
2
3
4
5
6
7
36 func add(numbers []int) int {
37 var v int
38 for _, n := range numbers {
39 v += n
40 }
41 return v
42 }

在L1的36行,声明了add方法,他接受一个int型的slice,然后返回它们的和。37定义了一个变量v去做数字累加。38行函数遍历这些整数,39行把当前数加上去。最后41行返回它们的和。
Question: add是否适合无序执行?我相信答案肯定是yes。整数集可以被分解成更小的lists,并且这些lists可以并行去处理。一旦所有lists都各自加完,这一系列lists的和可以加到一起,得到上面代码里一样的结果。
但是,另一个问题来了。我们应该分多少个lists去分别单独处理才能得到最好的吞吐量呢?为了回答这个问题,你需要知道add方法运行到底是哪种workload。add方法处理的是CPU-Bound类型的workload因为这是一个纯数学计算的方法,它不会导致goroutines进入自动等待状态。这意味着每个OS/硬件线程一个Goroutine即可获得理想的吞吐量。
下面的L2是add方法的并发版本。
注意:你有多种方式去写add的并发版本,不必去纠结代码本身。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
44 func addConcurrent(goroutines int, numbers []int) int {
45 var v int64
46 totalNumbers := len(numbers)
47 lastGoroutine := goroutines - 1
48 stride := totalNumbers / goroutines
49
50 var wg sync.WaitGroup
51 wg.Add(goroutines)
52
53 for g := 0; g < goroutines; g++ {
54 go func(g int) {
55 start := g * stride
56 end := start + stride
57 if g == lastGoroutine {
58 end = totalNumbers
59 }
60
61 var lv int
62 for _, n := range numbers[start:end] {
63 lv += n
64 }
65
66 atomic.AddInt64(&v, int64(lv))
67 wg.Done()
68 }(g)
69 }
70
71 wg.Wait()
72
73 return int(v)
74 }

在L2里面,addConcurrent方法是add方法的并发版本。这里有很多代码因此我只讲解重要的代码行
Line 48:每个Goroutine会有它单独的一个小的list去处理。list的size由整数集的size去除以Goroutines的数量得到。
Line 53:创建goroutines线程池去处理加数操作。
Line 57-59:最后一个goroutines会处理剩下的最后一个list,它可能比其他list的size要大。
Line 66:所有lists算出来的sum,加到一起得到最后的一个sum。

并发版本比有序版本更复杂,这种复杂度是否值得呢?回答这个问题最好的方式就是写一个benchmark。这里我用了一个一千万个数大小整数集,并且关掉了垃圾回收。这里对add和addConcurrent进行了对比。
L3

1
2
3
4
5
6
7
8
9
10
11
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
add(numbers)
}
}

func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
addConcurrent(runtime.NumCPU(), numbers)
}
}

L3展示了benchmark函数。下面是当Goroutines只有一个单独的OS/硬件线程能用的情况。有序版本使用1个Goroutine然后并发版本使用runtime.NumCPU数,我的机器上是8。这个例子下面,并发版本没有使用并行去做并发。
L4

1
2
3
4
5
6
7
8
9
10
11
12
10 Million Numbers using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential 1000 5720764 ns/op : ~10% Faster
BenchmarkConcurrent 1000 6387344 ns/op
BenchmarkSequentialAgain 1000 5614666 ns/op : ~13% Faster
BenchmarkConcurrentAgain 1000 6482612 ns/op

注意:在你的本机上跑BenchMark很复杂。有很多因素会导致你的benchmarks不够精确。你的机器尽可能的处于空闲状态这样可以去跑一段时间benchmark,以确保自己看到的结果和上面的大体一致。使用测试工具跑两遍benchmark能够得到更一致的结果。
L4给出的benchmark表明,在仅有一个单独OS/硬件线程时候有序版本比并发版本大约要快%10–%13。这在我们的意料之中,因为并发版本需要在一个单独的OS线程上频繁进行上下文切换(context switches)以及处理Goroutines。
下面是每个Goroutines有一个单独的OS/硬件线程的情况下的结果。有序版本用一个Goroutine然后并发版本使用runtime.NumCPU,在我本机上是8个。这种情况下利用了并行去处理并发。
L5

1
2
3
4
5
6
7
8
9
10
11
12
10 Million Numbers using 8 goroutines with 8 cores
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential-8 1000 5910799 ns/op
BenchmarkConcurrent-8 2000 3362643 ns/op : ~43% Faster
BenchmarkSequentialAgain-8 1000 5933444 ns/op
BenchmarkConcurrentAgain-8 2000 3477253 ns/op : ~41% Faster

L5中的benchmark表明了,每个Goroutines使用一个OS/硬件线程的时候并发版本比有序版本要快大约41%–43%。这是我们期望中的事情,因为所有的Goroutines现在都在并行执行,8个Goroutines现在都在同一时间并发执行。

排序

需要明白,不是所有的CPU-bound的workloads都适合并发处理。当把工作拆解或者是把结果合并需要花费很大代价的时候这种说法是正确的。这种情况我们可以看一个算法的例子:冒泡排序。看一下下Go实现的冒泡排序。
L6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
01 package main
02
03 import "fmt"
04
05 func bubbleSort(numbers []int) {
06 n := len(numbers)
07 for i := 0; i < n; i++ {
08 if !sweep(numbers, i) {
09 return
10 }
11 }
12 }
13
14 func sweep(numbers []int, currentPass int) bool {
15 var idx int
16 idxNext := idx + 1
17 n := len(numbers)
18 var swap bool
19
20 for idxNext < (n - currentPass) {
21 a := numbers[idx]
22 b := numbers[idxNext]
23 if a > b {
24 numbers[idx] = b
25 numbers[idxNext] = a
26 swap = true
27 }
28 idx++
29 idxNext = idx + 1
30 }
31 return swap
32 }
33
34 func main() {
35 org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
36 fmt.Println(org)
37
38 bubbleSort(org)
39 fmt.Println(org)
40 }

在L6里,给出了Go版本的冒泡排序。排序算法遍历每个值并在整数集上进行数据交替。根据初始顺序不同,排序可能需要多次的遍历。
Question: bubbleSort的workload适合无序执行吗?答案肯定是no。整数集可以分解成更小的lists并且这些lists可以并发地排序。但是所有并发工作完成之后,并没有一个有效的方式再去把这些小的lists排序到一起。这里是一个并发版本的冒泡排序。
L8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
01 func bubbleSortConcurrent(goroutines int, numbers []int) {
02 totalNumbers := len(numbers)
03 lastGoroutine := goroutines - 1
04 stride := totalNumbers / goroutines
05
06 var wg sync.WaitGroup
07 wg.Add(goroutines)
08
09 for g := 0; g < goroutines; g++ {
10 go func(g int) {
11 start := g * stride
12 end := start + stride
13 if g == lastGoroutine {
14 end = totalNumbers
15 }
16
17 bubbleSort(numbers[start:end])
18 wg.Done()
19 }(g)
20 }
21
22 wg.Wait()
23
24 // Ugh, we have to sort the entire list again.
25 bubbleSort(numbers)
26 }

L8中,bubbleSortConcurrent方法是bubbleSort的并发版本。它使用多个Goroutines去并发地排序整个整数集的一部分。结果你得到的是各自的排序的list。结果你最终在25行还是要整个list做一次排序。
因为冒泡排序的本质就是遍历整个list。25行调用bubbleSort直接否定了任何并发的潜在收益。冒泡排序里,使用并发并没有性能上的增益。

读取文件

我们给出了2个CPU-Bound类型的workloads,那么IO-Bound类型的workload情况是什么样的?当Goroutines自动进入或者是离开waiting状态,情况会有什么不同么?看一个IO-bound类型的workload,它的工作内容是读取文件并查找文本。
第一个版本是一个有序版本的find方法
L10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
42 func find(topic string, docs []string) int {
43 var found int
44 for _, doc := range docs {
45 items, err := read(doc)
46 if err != nil {
47 continue
48 }
49 for _, item := range items {
50 if strings.Contains(item.Description, topic) {
51 found++
52 }
53 }
54 }
55 return found
56 }

在L10里面,你看到一个有序版本的find函数。line 43定义了一个found变量去存topic在文档里的出现次数。line 44,对所有文档进行遍历,并且在45行上使用read方法对每个doc进行读取。最后从49–53行,使用strings包的Contains方法去检查topic是否在读取到的items里面。如果发现,found变量就对应加一。
这里是find调用的read方法的实现。
L11

1
2
3
4
5
6
7
8
33 func read(doc string) ([]item, error) {
34 time.Sleep(time.Millisecond) // Simulate blocking disk read.
35 var d document
36 if err := xml.Unmarshal([]byte(file), &d); err != nil {
37 return nil, err
38 }
39 return d.Channel.Items, nil
40 }

read方法以一个time.Sleep方法开始。这个里模拟了真实从硬盘读取文档的系统调用所产生的延迟。设置这个延迟对我们精确地测试有序版本和并发版本find方法的性能差异十分重要。然后在35–39行,测试的xml文档存储在fine的全局变量里,它被反序列化成一个要去处理的struct。最后返回了一个items的集合。
下面是一个并发版本代码。
注意:有多种方式去写并发版本代码,不要纠结于这个代码本身实现。
L12

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
58 func findConcurrent(goroutines int, topic string, docs []string) int {
59 var found int64
60
61 ch := make(chan string, len(docs))
62 for _, doc := range docs {
63 ch <- doc
64 }
65 close(ch)
66
67 var wg sync.WaitGroup
68 wg.Add(goroutines)
69
70 for g := 0; g < goroutines; g++ {
71 go func() {
72 var lFound int64
73 for doc := range ch {
74 items, err := read(doc)
75 if err != nil {
76 continue
77 }
78 for _, item := range items {
79 if strings.Contains(item.Description, topic) {
80 lFound++
81 }
82 }
83 }
84 atomic.AddInt64(&found, lFound)
85 wg.Done()
86 }()
87 }
88
89 wg.Wait()
90
91 return int(found)
92 }

L12是find方法的并发版本。并发版本有30行代码,而非并发版本代码只有13行。我的目标是处理未知数量的documents时候控制Goroutines的数量。这里我选择在池化模式里使用一个channel去给池子里的goroutines喂数据。
这部分代码比较多,我只讲解重要部分
Line 61-64: 创建一个channel去处理所有的documents。
Line 65 关闭这个channel,来让池子里的goroutines在所有documents处理完成后能自动停止。
Line 70:创建一个goroutines线程池
Line 73–83:每一个池子里的goroutine从channel接受一个document,读取到内存然后检查内容是否有topic。匹配的话,lfound就加一个。
Line 84:把每个单独goroutines跑出来的数加到一起。
并发版本确实比有序版本代码更加复杂,这个复杂性是否值得?验证的最好方式就是再次写一个benchmark。我用了1000个documents的集合,并且关闭了垃圾回收。一个是顺序版本find,一个是并发版本findConcurrent
L13

1
2
3
4
5
6
7
8
9
10
11
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
find("test", docs)
}
}

func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
findConcurrent(runtime.NumCPU(), "test", docs)
}
}

L13给出了benchmark。下面是当所有goroutines只有一个OS/硬件线程的时候。顺序代码使用1个goroutines,而并发版本是runtime.NumCPU的数,在我本机上是8。这种情况下,我们没用并行去做并发。
L14

1
2
3
4
5
6
7
8
9
10
11
12
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential 3 1483458120 ns/op
BenchmarkConcurrent 20 188941855 ns/op : ~87% Faster
BenchmarkSequentialAgain 2 1502682536 ns/op
BenchmarkConcurrentAgain 20 184037843 ns/op : ~88% Faster

L14里面表明了,在只有一个单独OS/硬件线程的时候,并发版本大概要比顺序版本代码快87%–%88。这是我们预料到的因为每个Goroutines都能有效的共享这一个OS/硬件线程。在read调用的时候每个goroutines能够自动进行上下文切换,这样OS/硬件线程会一直有事情做。
下面是使用并行去做并发处理。
L15

1
2
3
4
5
6
7
8
9
10
11
12
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential-8 3 1490947198 ns/op
BenchmarkConcurrent-8 20 187382200 ns/op : ~88% Faster
BenchmarkSequentialAgain-8 3 1416126029 ns/op
BenchmarkConcurrentAgain-8 20 185965460 ns/op : ~87% Faster

L15的benchmark结果说明,额外的OS/硬件线程并没有提供更好的性能。

结论

这篇文章的目的就是让你知道什么时候你的workload适合使用并发。考虑到不同的场景,我给出了不同的例子。
你可以清楚的看到IO-Bound类型的workload并不需要使用并行处理去获得性能的大幅增加,这正好跟CPU-Bound类型的工作截然相反。像类似冒泡算法这种,使用并发其实会增加代码复杂度,而且不会有任何性能增益。所以,一定要确定你的workload是否适合使用并发场景,这是很重要的事情。

原文链接:www.ardanlabs.com/blog/2018/1…

-------------本文结束 感谢阅读-------------