Golang 并发编程实战——协程、管道、select用法
在阅读本文前,我希望你有一定的Go语言基础,以及一部分关于协程的使用经验。
本文旨在帮助你使用高级并发技巧,其主要包含了以下几个部分:goroutine
的基本用法;使用chan
来实现多个goroutine
之间的通信;使用select
关键字来处理超时等。
术语 | 解析 |
---|---|
goroutine | 指协程,比线程要更轻量级 |
chan /channel | 指管道,多用于多个 goroutine 之间通信 |
一个简单的例子
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
go boring("boring!")
fmt.Println("I'm listening")
time.Sleep(2 * time.Second)
fmt.Println("You're boring. I'm leaving")
}
I'm listening
boring! 0
boring! 1
boring! 2
boring! 3
boring! 4
boring! 5
You're boring. I'm leaving
上述这段代码有两个部分,boring
方法负责向控制台输出当前的循环次数,main
方法的第一行为这个方法开启了一个协程,也就是说,main
方法不会等待boring
方法执行完毕。main
方法在输出I'm listening
后,进入为期2秒的睡眠,随后唤醒结束main函数。由于main函数结束会带来整个程序的结束,所以开启的boring
协程也会结束。
不过,上述的例子只是一个简单的演示。实际上,协程之间、协程与主进程之间是需要通信的,这能够帮助我们完成更为复杂的应用。
Go 管道的用法
一个简单的使用方法如下
func boring(msg string, c chan string) {
for i := 0; ; i++ {
// 发送信息给管道 (hannel / chan)
// 同时,它也在等待管道的消费者消费完成
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
c := make(chan string) // 初始化一个管道
go boring("boring!", c)
for i := 0; i < 5; i++ {
// `<-c` 等待 `boring` 方法给它发送值,如果一直没有收到,那么会被阻塞在这一步
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring. I'm leaving")
}
You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You're boring. I'm leaving
上述方法简单说就是boring
方法在给管道c
发送数据,并且等待另一头,也就是main
方法来消费。由于管道中只能够存在一个数据,所以main
方法和boring
方法在某些程度上是交替运行的。但实际上不完全是,以main
方法来说,接受到管道的数据后可以直接进行下一步,而不需要继续等待。
【知识点】Chan的概念
在Go语言中,通道是goroutine
与另一个goroutine
通信的媒介,并且这种通信是无锁的。换句话说,通道是一种允许一个goroutine
将数据发送到另一个goroutine
的技术。默认情况下,通道是双向的,这意味着goroutine可以通过同一通道发送或接收数据,如下图所示:
在Go语言中,除了
chan string
这样的写法能够使用读写功能双向管道外,还可以创建出单向管道,如<-chan string
只能从管道中读取数据,而chan<- string
只能够向管道中写入数据。
【案例讲解】两个线程输出数据
通过两个管道实现
// `boring` 是一个返回管道的方法,该管道用于和 `boring` 方法通信
// `<-chan string` 意味着只能够从管道里面接受 String 数据,而不能够向该管道发送数据
func boring(msg string) <-chan string {
c := make(chan string)
// 现在在 boring 方法中开启协程,并在这个协程中向管道发送数据
go func() {
for i := 0; i < 10; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
close(c) // 要记得关闭协程
}()
return c
}
func main() {
joe := boring("Joe")
ahn := boring("Ahn")
// 必须要按照顺序输出 joe 和 ahn
for i := 0; i < 10; i++ {
fmt.Println(<-joe)
fmt.Println(<-ahn)
}
fmt.Println("You're both boring. I'm leaving")
}
这段代码会让代码按照boring("Joe")
、boring("Ahn")
这样交替输出。虽然说是能够交替输出数据,但这个本质上不是通过线程之间的通信实现,为此下面会进行一点改造。
合并管道
// `boring` 是一个返回管道的方法,该管道用于和 `boring` 方法通信
// `<-chan string` 意味着只能够从管道里面接受 String 数据,而不能够向该管道发送数据
func boring(msg string) <-chan string {
c := make(chan string)
go func() {
for i := 0; i < 10; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
close(c)
}()
return c
}
func fanIn(cs ...<-chan string) <-chan string {
c := make(chan string)
for _, ci := range cs { // spawn channel based on the number of input channel
go func(cv <-chan string) {
for {
c <- <-cv // 把 cs 里面的每个管道都接到主管道 c 上
}
}(ci)
}
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ahn"))
for i := 0; i < 20; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring. I'm leaving")
}
现在我们可以从两个方法内的协程获取到数据,虽然不能够保证交替输出数据(在这里是随机的),下面,我们使用管道来让多个进程之间开始通信。
协程间通信
type Message struct {
str string // 真正要传输的数据
wait chan bool //
}
func fanIn(inputs ...<-chan Message) <-chan Message {
c := make(chan Message)
for i := range inputs {
input := inputs[i]
go func() {
for {
c <- <-input
}
}()
}
return c
}
// `boring` 是一个返回管道的方法,该管道用于和 `boring` 方法通信
func boring(msg string) <-chan Message {
c := make(chan Message)
waitForIt := make(chan bool)
go func() {
for i := 0; ; i++ {
c <- Message{
str: fmt.Sprintf("%s %d", msg, i),
wait: waitForIt, // 将管道注入到返回值中,用于协程之间的通信
}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
// 协程需要在这里等到接收到信息才能够继续执行后面的逻辑
<-waitForIt
}
}()
return c
}
func main() {
// merge 2 channels into 1 channel
c := fanIn(boring("Joe"), boring("Ahn"))
for i := 0; i < 5; i++ {
msg1 := <-c // 等到从管道中接受数据
fmt.Println(msg1.str)
msg2 := <-c
fmt.Println(msg2.str)
// 由于 boring 协程中需要等待 wait 信号才能继续执行,所以这一步能够保证两个协程都能够输出一次数据
msg1.wait <- true // main 协程允许 boring 协程继续执行任务
msg2.wait <- true // main 协程允许 boring 协程继续执行任务
}
fmt.Println("You're both boring. I'm leaving")
}
【案例讲解】设定超时等待时间
一个简单实现
// `boring` 是一个返回管道的方法,该管道用于和 `boring` 方法通信
func boring(msg string) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
}
}()
return c
}
func main() {
c := boring("Joe")
// timeout 的类型为:`<-chan Time`
timeout := time.After(5 * time.Second) // 这个方法会在 5 秒钟向 timeout 管道写入数据
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
通过select能够保证在时间到达之后,执行case 2来结束程序。如果刚好二者一起到达,那么会随机执行一个case,在这里case最多可能会执行一次,但不一定来得及输出结果。
【知识点】select 解析
select 是 Go 中的一个控制结构,类似于用于通信的 switch 语句。每个 case 必须是一个通信操作,要么是发送要么是接收。 select 随机执行一个可运行的 case。如果没有 case 可运行,那么会执行 default 里的操作,如果没有 default,那么它将阻塞,直到有 case 可运行。一个默认的子句应该总是可运行的。
【实战】模拟Google搜索服务
Web页面中,搜索是一个很常见的功能,多数情况下,我们会使用一个微服务来搭建一个搜索服务,如ElasticSearch
就是一个单独的服务。在这里,我们不会真的模拟一个ES来处理,反之,我们用一个随机延时的函数来代替它。由于搜索的时间不能够保证,有时候会很快,但有时候也会慢,不管是因为搜索本身就需要时间还是由于IO的耗时。
在这个案例中,我们将会循序渐进来告诉你如何更好的利用goroutine
和chan
来处理这个问题。除此以外,这里还使用了函数式编程
技巧,如果你对这个还不太熟悉,可以先了解一些相关的知识再来继续阅读。
Google搜索1.0
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(100 * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
// 它会依次调用Web,Image和Video,并将它们附加到结果中返回
func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
[
web result for "golang"
image result for "golang"
video result for "golang"
]
331.1909ms
现在,我们可以从Google
方法中获取到结果,但是这一步还远远不够,我们希望调用搜索服务有一个时间上线,如果超时,那么相关的结果就不要了,返回现在已经有的数据。但在此之前,我们还发现Google
方法中,三个查询是顺序调用的,只有前者返回了结果,才能够执行后面的逻辑,这也是为什么返回的时间这么长的原因。基于此,我们首先要将搜索的结果更改为并发执行的。
并发搜索
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(100 * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func Google(query string) []Result {
c := make(chan Result)
// 搜索的结果都会返回到管道 c 中
go func() {
c <- Web(query)
}()
go func() {
c <- Image(query)
}()
go func() {
c <- Video(query)
}()
var results []Result
for i := 0; i < 3; i++ {
results = append(results, <-c)
}
return results
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
[
image result for "golang"
web result for "golang"
video result for "golang"
]
109.5769ms
可以看到,现在的搜索结果只需要100+ms
的时间了,这说明三次搜索的确是并发执行的。
更进一步:加上超时时间
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // 将搜索时间更改为随机值
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func Google(query string) []Result {
c := make(chan Result)
go func() {
c <- Web(query)
}()
go func() {
c <- Image(query)
}()
go func() {
c <- Video(query)
}()
var results []Result
// 在这里,timeout 会在 50 毫秒后接收到管道传来的信息
timeout := time.After(50 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case r := <-c:
results = append(results, r)
case <-timeout: // 在 timeout 接收到信息后,将会结束搜索,并将结果直接返回
fmt.Println("timeout")
return results
}
}
return results
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
结果一:部分逻辑超时
timeout
[video result for "golang"]
61.2052ms
结果二:所有逻辑都没有超时
[
web result for "golang"
image result for "golang"
video result for "golang"
]
28.1629ms
结语
至此,我已经告诉你一些高级并发编程技巧,但高阶技巧远不止这些,希望这些技巧能给你一些帮助,并带来一些思考。
转载自:https://juejin.cn/post/6993131351294050341