Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

My Blog

从此烟雨落金城,一人撑伞两人行。

Go并发-下

基础知识

并发和并行之间的区别。

  • 并发(concurrency):把任务在不同的时间点交给处理器进行处理。在同一时间点,任务并不会同时运行。
  • 并行(parallelism):把每一个任务分配给每一个处理器独立完成。在同一时间点,任务一定是同时运行。

Go 语言的并发通过 goroutine 特性完成,由 Go 语言的运行时调度完成。

提供 channel 在多个 goroutine 间进行通信。goroutine 和 channel 是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

goroutine

使用 go 关键字就可以创建 goroutine,将 go 声明放到一个需调用的函数之前。

所有 goroutine 在 main() 函数结束时会一同结束。

channel

Go语言在语言级别提供的 goroutine 间的通信方式。使用 channel 在两个或多个 goroutine 之间传递消息。

一个 channel 只能传递一种类型的值,类型需要在声明 channel 时指定。

死锁、活锁和饥饿

死锁是指两个或两个以上的进程(或线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。

活锁是另一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行,因为线程将不断重复同样的操作,而且总会失败。

饥饿是指一个可运行的进程尽管能继续执行,但被调度器无限期地忽视,而不能被调度执行的情况。

并发通信

最常见的并发通信模型:共享数据和消息。

传统的同步 goroutine 的机制,对共享资源加锁。atomic 和 sync 包可以对共享的资源进行加锁操作。

原子函数

以底层的加锁机制来同步访问整型变量和指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
counter int64
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go incCounter(1)
go incCounter(2)
wg.Wait() //等待goroutine结束
fmt.Println(counter)
}
func incCounter(id int) {
defer wg.Done()
for count := 0; count < 2; count++ {
atomic.AddInt64(&counter, 1) //安全的对counter加1
runtime.Gosched()
}
}

互斥锁和读写锁

sync 包提供了两种锁类型:sync.Mutex 和 sync.RWMutex。RWMutex 相对友好些,是经典的单写多读模型。

用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以执行这个临界代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main
import (
"fmt"
"runtime"
"sync"
)
var (
counter int64
wg sync.WaitGroup
mutex sync.Mutex
)
func main() {
wg.Add(2)
go incCounter(1)
go incCounter(2)
wg.Wait()
fmt.Println(counter)
}
func incCounter(id int) {
defer wg.Done()
for count := 0; count < 2; count++ {
//同一时刻只允许一个goroutine进入这个临界区
mutex.Lock()
{
value := counter
runtime.Gosched()
value++
counter = value
}
mutex.Unlock() //释放锁,允许其他正在等待的goroutine进入临界区
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
var (
// 逻辑中使用的某个变量
count int
// 与变量对应的使用互斥锁
countGuard sync.RWMutex
)
func GetCount() int {
// 锁定
countGuard.RLock()
// 在函数退出时解除锁定
defer countGuard.RUnlock()
return count
}

等待组

使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务。

方法名 功能
(wg * WaitGroup) Add(delta int) 等待组的计数器 +1
(wg * WaitGroup) Done() 等待组的计数器 -1
(wg * WaitGroup) Wait() 当等待组计数器不等于 0 时阻塞直到变 0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
// 声明一个等待组
var wg sync.WaitGroup
// 准备一系列的网站地址
var urls = []string{
"http://www.github.com/",
"https://www.qiniu.com/",
"https://www.golangtc.com/",
}
// 遍历这些地址
for _, url := range urls {
// 每一个任务开始时, 将等待组增加1
wg.Add(1)
// 开启一个并发
go func(url string) {
// 使用defer, 表示函数完成时将等待组值减1
defer wg.Done()
// 使用http访问提供的地址
_, err := http.Get(url)
// 访问完成后, 打印地址和可能发生的错误
fmt.Println(url, err)
// 通过参数传递url地址
}(url)
}
// 等待所有的任务完成
wg.Wait()
fmt.Println("over")
}

通道(chan)

可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。

提倡使用通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。

在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。

通道声明
1
var 通道变量 chan 通道类型
通道创建
1
通道实例 := make(chan 数据类型)
发送数据
1
2
3
4
5
6
7
// 通道变量 <- 值
// 创建一个空接口通道
ch := make(chan interface{})
// 将0放入通道中
ch <- 0
// 将hello字符串放入通道中
ch <- "hello"
接受数据
1
2
3
4
5
6
7
8
9
// 阻塞接收数据
data := <-ch
// 非阻塞接收数据
data, ok := <-ch
// 忽略接收的数据
<-ch
// 循环接收
for data := range ch {
}

单向通道

只能写入数据的通道类型为chan<-,只能读取数据的通道类型为<-chan

1
2
3
4
5
6
7
ch := make(chan int)
// 声明一个只能写入数据的通道类型, 并赋值为ch
var chSendOnly chan<- int = ch
// 声明一个只能读取数据的通道类型, 并赋值为ch
var chRecvOnly <-chan int = ch
// make 创建通道时,也可以创建一个只写入或只读取的通道
ch := make(<-chan int)

带缓冲的通道

创建缓冲通道
1
2
// 创建一个3个元素缓冲大小的整型通道
ch := make(chan int, 3)
阻塞条件
  • 带缓冲通道被填满时,尝试再次发送数据时发生阻塞。
  • 带缓冲通道为空时,尝试接收数据时发生阻塞。

超时机制

Go语言中提供了 select 关键字,可以同时响应多个通道的操作。每个 case 语句里必须是一个 IO 操作。

在一个 select 语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。

如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。

没有任意一条语句可以执行:

  • 如果给出了 default 语句,那么就会执行 default 语句,同时程序的执行会从 select 语句后的语句中恢复;
  • 如果没有 default 语句,那么 select 语句将被阻塞,直到至少有一个通信可以进行下去。

select 做的就是:选择处理列出的多个通信情况中的一个。

  • 如果都阻塞了,会等待直到其中一个可以处理
  • 如果多个可以处理,随机选择一个
  • 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。

select 语句实现了一种监听模式,通常用在(无限)循环中;在某种情况下,通过 break 语句使循环退出。

1
2
3
4
5
6
7
8
9
select{
case 操作1:
响应操作1
case 操作2:
响应操作2

default:
没有操作情况
}
操 作 语句示例
接收任意数据 case <- ch;
接收变量 case d := <- ch;
发送数据 case ch <- 100;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
quit := make(chan bool)
//新开一个协程
go func() {
for {
select {
case num := <-ch:
fmt.Println("num = ", num)
case <-time.After(3 * time.Second):
fmt.Println("超时")
quit <- true
}
}
}() //别忘了()
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
<-quit
fmt.Println("程序结束")
}

从已经关闭的通道接收数据或者正在接收数据时,将会接收到通道类型的零值,然后停止阻塞并返回。

多核并行化

通过设置环境变量 GOMAXPROCS 的值来控制使用多少个 CPU 核心。

1
2
3
4
5
6
7
8
9
10
package main
import (
"fmt"
"runtime"
)
func main() {
cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数
fmt.Println("cpu核心数:", cpuNum)
runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量
}

CSP:通信顺序进程

CSP(communicating sequential processes)并发模型。

Go语言并没有完全实现了 CSP 并发模型的所有理论,仅仅是实现了 process 和 channel 这两个概念。

process 就是Go语言中的 goroutine,每个 goroutine 之间是通过 channel 通讯来实现数据共享。

通道(channel)和 map、切片一样,也是由 Go 源码编写而成。为了保证两个 goroutine 并发访问的安全性,通道也需要做一些锁操作,因此通道其实并不比锁高效。

评论