likes
comments
collection
share

go 并发编程基础知识

作者站长头像
站长
· 阅读数 25

Sync

Once

Once 包裹的函数只会执行一次,哪怕重复调用,也是执行一次

var once sync.Once

func loadData() {
  time.Sleep(3 * time.Second)
  fmt.Println("loading data ...")		// 只会执行一次
}

func getData() {
  once.Do(loadData)
}

func main() {
  var wg sync.WaitGroup
  wg.Add(2)
  go func() {
    getData()
    wg.Done()
  }()
  go func() {
    getData()
    wg.Done()
  }()
  wg.Wait()
}

WaitGroup

使用 WaitGroup 等待 goroutine

  1. 声明一个 wg
  2. wg.Add(1) 监控多少个 goroutine 执行结束
  3. 每个 goroutine 执行结束,需要调用 wg.Done()
  4. wg.Wait() 等待所有 goroutine 执行结束
func main() {
  var wg sync.WaitGroup
  // wg.Add(100)  // 或者在外面写明需要监控 100 个 goroutine,这样就不用在循环中写了
  for i := 0; i < 100; i++ {
    wg.Add(1)     // 每个循环都要监控
    go func(i int) {
      fmt.Println(i)
      wg.Done()
    }(i)
  }
  wg.Wait()
  fmt.Println("all done")
}

互斥锁

多个 goroutine 操作同一个变量,会出现数据竞争,导致结果不确定

为了解决这个问题,可以使用锁

  1. 声明一个锁 lock sync.Mutex
  2. 在操作变量之前,调用 lock.Lock() 加锁
  3. 在操作变量之后,调用 lock.Unlock() 解锁

需要注意的是:

  1. 加锁和解锁必须成对出现,否则会出现死锁
  2. 锁不要复制,声明一个全局的锁,然后在每个 goroutine 中使用
  3. 同一把锁不能重复加或者重复解
var total int
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
  defer wg.Done()
  for i := 0; i < 10000000; i++ {
    lock.Lock()
    total += 1
    lock.Unlock()
  }
}
func sub() {
  defer wg.Done()
  for i := 0; i < 10000000; i++ {
    lock.Lock()
    total -= 1
    lock.Unlock()
  }
}
func main() {
  wg.Add(2)
  go add()
  go sub()
  wg.Wait()
  fmt.Println(total)
}

在做简单的加减时,可以使用一个原子函数 atomic 包提供的 AddInt32 方法

AddInt32 接收的是一个指针,在做简单加减时,可以使用这个方法

var wg sync.WaitGroup
var total int32

func add() {
  defer wg.Done()
  for i := 0; i < 10000000; i++ {
    atomic.AddInt32(&total, 1)
  }
}
func sub() {
  defer wg.Done()
  for i := 0; i < 10000000; i++ {
    atomic.AddInt32(&total, -1)
  }
}
func main() {
  wg.Add(2)
  go add()
  go sub()
  wg.Wait()
  fmt.Println(total)
}

TryLock

会尝试加锁,如果加锁失败,返会返回 false,加锁成功,返回 true

var lock sync.Mutex

func main() {
  ok := lock.TryLock()		// 加锁成功,返回 true
  fmt.Println(ok)

  ok = lock.TryLock()			// 加锁失败,返回 false
  fmt.Println(ok)

  lock.Lock()							// 无法再次加锁
  lock.Unlock()
}

读写锁

读写锁:读写之间是串行,读和读之间是并行

  1. 写锁:在写的的时候,防止别人写和读,只有等写锁释放了,别人才能读和写
  2. 读锁:不防止别人读写
var wg sync.WaitGroup
var rwlock sync.RWMutex   // 读写锁

func main() {
  wg.Add(6)
  // 写 goroutine
  go func() {
    time.Sleep(3 * time.Second)
    defer wg.Done()
    rwlock.Lock()     // 加写锁,写锁会防止别的写锁和读锁获取
    defer rwlock.Unlock()
    fmt.Println("get write lock")
    time.Sleep(5 * time.Second)
  }()

  for i := 0; i < 5; i++ {
    // 读 goroutine
    go func() {
      defer wg.Done()
      for {
        time.Sleep(500 * time.Microsecond)
        rwlock.RLock()  // 加读锁,读锁不会防止别的读锁获取
        fmt.Println("get read lock")
        rwlock.RUnlock()
      }
    }()
  }
  wg.Wait()
}

Cond

Cond 会阻塞当前的 goroutine,直到收到 signal 或者 broadcast 信号

var lock sync.Mutex

func main() {
  ready := false
  var lock sync.Mutex
  cond := sync.NewCond(&lock)		// 声明一个 cond,传入一个锁
  go func() {
    fmt.Println("goroutine 1 start")
    for {
      lock.Lock()
      if ready {
        lock.Unlock()
        break
      }
      cond.Wait()				// 被阻塞
      lock.Unlock()
    }
    fmt.Println("goroutine 1 done")
  }()

  time.Sleep(2 * time.Second)
  ready = true
  cond.Signal()    // 唤醒,只能唤醒一个 goroutine
  // cond.Broadcast() // 唤醒所有 goroutine
  time.Sleep(10 * time.Second)
}

Pool

适用于频繁的创建和释放某一个结构体,因为频繁的创建和释放会损耗一些性能

使用对象池,是为了在释放的时候不会真正的把内存给释放掉,把它还给对象池,下次在用的时候直接从对象池中获取,就不用重新创建了

type MyStruct struct {
  Name string
  Age  int
}

func main() {
  pool := sync.Pool{
    // 新的对象池中,没有对象会调用 New 方法
    New: func() interface{} {
      return &MyStruct{} // 需要用指针的形式,sync.Pool 会管理和重复使用对象,而不是每次都新建对象
    },
  }

  for i := 0; i < 10; i++ {
    ms := pool.Get().(*MyStruct)            // 从对象池中获取对象,需要断言
    fmt.Printf("after pool get: %v\n", *ms) // 从对象池中拿到的对象不是初始值,而是上一次对象的值,所以需要自己做一些初始化的事情
    ms.Name = fmt.Sprintf("Name: %d", i)
    ms.Age = i
    fmt.Println(*ms)
    pool.Put(ms) // 将对象还给对象池
  }
}

Map

普通的 map 是并发不安全的,使用时需要用到锁

所以 go 提供了一个并发安全的 mapsync.Map

func main() {
  mm := sync.Map{}		// 声明一个 map

  for i := 0; i < 10; i++ {
    go func(i int) {
      mm.Store(i, i)		// 向 map 中存入数据
    }(i)
  }
  time.Sleep(2 * time.Second)
  v, ok := mm.Load(1)		// 从 map 中获取数据
  if ok {
    fmt.Println(v)
  }
  // 遍历 map
  mm.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true		// 返回 true 继续遍历,返回 false 停止遍历
  })
}

goroutine 通信

不要通过共享内存来通信,而应该通过通信来共享内存

go 是通过 channel 来实现通信的,channel 又分为 无缓冲有缓冲

有缓冲:size 大于 0

func main() {
  var msg chan string = make(chan string, 1)  // 有缓冲 channel
  msg <- "uccs"
  data := <-msg
  fmt.Println(data)
}

无缓冲:size 等于 0

func main() {
  var msg chan string = make(chan string, 0)  // 无缓冲 channel
  msg <- "uccs"   // 会阻塞
  data := <-msg
  fmt.Println(data)
}

无缓冲的 channel 正常使用需要使用 goroutine

func main() {
  var msg chan string = make(chan string, 1)

  go func(msg chan string) {  // 使用 goroutine,go 有一种 happen-before 的机制,可以保障
    data := <-msg
    fmt.Println(data)
  }(msg)

  msg <- "ucc2s"
  time.Sleep(10 * time.Second)
}

关闭 channel

channel 在使用完毕后,需要关闭,否则会出现死锁

func main() {
  var msg chan string = make(chan string, 1)

  go func(msg chan string) {
    for data := range msg {
      fmt.Println(data)
    }
    fmt.Println("done")  // 这一行就不会打印
  }(msg)

  msg <- "uccs"
  msg <- "uccs2"
  close(msg)  // 如果没有使用 close 关闭 channel

  time.Sleep(10 * time.Second)
}

关闭一个已经关闭的 channel 会 panic

ch := make(chan int, 10)
close(ch)
close(ch)		// close to closed channel

从一个已经关闭的 channel 中读取数据,会读取到 channel0 值,比如 int 类型的 channel,会读取到 0

ch := make(chan int, 10)
close(ch)
num, ok := <-ch
fmt.Println(num)		// 0
if ok {		// false
  fmt.Println(num)
}

如果 close 之后,还有数据没有读完,可以继续读取,直到读取完毕,再读取就会读取到 0

ch := make(chan int, 10)
ch <- 1
close(ch)
num, ok := <-ch
if ok {		// true
  fmt.Println(num)		// 1
}
num, ok = <-ch
if ok {		// false
  fmt.Println(num)
}

往一个已经关闭的 channel 中写入数据,会 panic

ch := make(chan int, 10)
close(ch)
ch <- 1		// send on closed channel

单向 channel

channel 可以指定为单向的,只能发送或者只能接收

  • send-onlychan<- string
  • receive-only<-chan string
c := make(chan string, 3) // 双向 channel,可读可写
var sand chan<- string = c  // 单向 channel,只能写
var read <-chan string = c  // 单向 channel,只能读

例子:

  1. 定义一个生产者 product 函数,它的参数是 send-onlychannel
  2. 定义一个消费者 consumer 函数,它的参数是 receive-onlychannel
  3. 定义一个双向的 channel,传递给 productconsumer,会自动转换成单向 channel
func product(ch chan<- int) {    // 会自动将双向 channel 转换成单向的 channel
  for i := 0; i < 10; i++ {
    ch <- i
  }
  close(ch)
}
func consumer(ch <-chan int) {    // 会自动将双向 channel 转换成单向的 channel
  for i := 0; i < 10; i++ {
    data := <-ch
    fmt.Println(data)
  }
}
func main() {
  c := make(chan int)   // 双向 channel
  go product(c)
  go consumer(c)
  time.Sleep(10 * time.Second)
}

例子

交替打印:01ab23cd45ef67gh89ij1011kl1213mn1415op1617qr1819st2021uv2223wx2425yz2627

var number, letter = make(chan bool), make(chan bool)

func printNumber() {
  var i = 0
  for {
    <-number
    fmt.Printf("%d%d", i, i+1)
    i += 2
    letter <- true
  }
}
func printLetter() {
  var i = 0
  var str = "abcdefghijklmnopqrstuvwxyz"
  for {
    <-letter
    if i >= len(str) {
      return
    }
    fmt.Print(str[i : i+2])
    i += 2
    number <- true
  }
}
func main() {
  go printNumber()
  go printLetter()

  number <- true
  time.Sleep(10 * time.Second)
}

用 channel 实现锁

先读在写 buffer 无所谓,只要 buffer 大于 0 就行,一开始先写一条数据在里面

func main() {
  ch := make(chan struct{}, 10000)		// buffer 大小无所谓
  ch <- struct{}{}		// 先写入一条数据
  var wg sync.WaitGroup

  for i := 0; i < 20000; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      <-ch		// 先读
      counter++
      ch <- struct{}{}	// 在写
    }()
  }
  wg.Wait()
  fmt.Println(counter)
}

先写在读,buffer 设置为 1,实现锁的效果

func main() {
  ch := make(chan struct{}, 1)		// buffer 设置为 1
  var wg sync.WaitGroup

  for i := 0; i < 20000; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      ch <- struct{}{}		// 先读
      counter++
      <-ch		// 再写
    }()
  }
  wg.Wait()
  fmt.Println(counter)
}

用 channel 实现 WaitGroup

func main() {
  ch := make(chan struct{}, 1)
  for i := 0; i < 2; i++ {
    go func(i int) {
      defer func() {
        ch <- struct{}{}
      }()
      time.Sleep(2 * time.Second)
      fmt.Printf("goroutine %d done\n", i)
    }(i)
  }
  for i := 1; i < 2; i++ {
    <-ch
  }
  fmt.Println("all done")
}

用 channel 限流

func main() {
  ch := make(chan struct{}, 1)		// buffer 为 1,只能同时执行一个 goroutine
  var wg sync.WaitGroup
  for i := 0; i < 20; i++ {
    wg.Add(1)
    ch <- struct{}{}	// 写入
    go func(i int) {
      defer func() {
        wg.Done()
        <-ch		// 读取
      }()
      time.Sleep(2 * time.Second)
      fmt.Printf("goroutine %d done\n", i)
    }(i)
  }
  wg.Wait()
  fmt.Printf("all done")
}

select

当代码运行到 select 时,只要有一个 case 能够执行,就会执行;如果都是阻塞状态,有 default 就会执行 default,如果没有,就会阻塞,直到有一个 case 可以执行

func g1(ch chan struct{}) {
  time.Sleep(1 * time.Second)
  ch <- struct{}{}
}
func g2(ch chan struct{}) {
  time.Sleep(2 * time.Second)
  ch <- struct{}{}
}

func main() {
  g1ch := make(chan struct{}, 1)
  g2ch := make(chan struct{}, 1)

  go g1(g1ch)
  go g2(g2ch)
  /*
    如果有 default 分支,会立马执行 default 分支,不会等待任何 case
    如果没有 default 分支,会阻塞,等待上面的 case,直到有一个 case 可以执行
  */
  select {
  case <-g1ch:
    fmt.Println("g1 done")
  case <-g2ch:
    fmt.Println("g2 done")
  default:
    fmt.Println("timeout")
  }
}

那如何我们需要实现等待一段时间后,如果还没有 case 可以执行,就执行 default 分支呢?

使用 for 循环加上 time.NewTimer 实现

func g1(ch chan struct{}) {
  time.Sleep(1 * time.Second)
  ch <- struct{}{}
}
func g2(ch chan struct{}) {
  time.Sleep(2 * time.Second)
  ch <- struct{}{}
}

func main() {
  g1ch := make(chan struct{}, 1)
  g2ch := make(chan struct{}, 1)

  go g1(g1ch)
  go g2(g2ch)

  // 等待 500 毫秒,如果还没有 case 可以执行,就执行 default 分支
  timer := time.NewTimer(500 * time.Millisecond)

  for {
    select {
    case <-g1ch:
      fmt.Println("g1 done")
      return
    case <-g2ch:
      fmt.Println("g2 done")
      return
    case <-timer.C:   // 或者使用 time.After(500*time.Millisecond)
      fmt.Println("timeout")
      return
    }
  }
}

channel 赋值给 nil 后,case 语句会阻塞

func main() {
  ch1 := make(chan int, 10)
  ch2 := make(chan int, 10)
  var wg sync.WaitGroup

  wg.Add(1)
  go func() {
    defer wg.Done()
    defer close(ch1)
    for i := 0; i < 10; i++ {
      ch1 <- i
      time.Sleep(1 * time.Second)
    }
  }()

  wg.Add(1)
  go func() {
    defer wg.Done()
    defer close(ch2)
    for i := 0; i < 10; i++ {
      ch2 <- i
      time.Sleep(500 * time.Millisecond)
    }
  }()

  timeout := time.After(7 * time.Second)
loop:
  for {
    select {
    case num, ok := <-ch1:
      if !ok {
        ch1 = nil		// 将 ch1 赋值为 nil,case 语句会阻塞
        break
      }
      fmt.Printf("receive data from ch1: %d\n", num)
    case num, ok := <-ch2:
      if !ok {
        ch2 = nil	// 将 ch2 赋值为 nil,case 语句会阻塞
        break
      }
      fmt.Printf("receive data from ch2: %d\n", num)
    case <-timeout:
      fmt.Println("no data, timeout")
      break loop
    }
    // 跳出循环需要判断 ch1 和 ch2 是否为 nil,如果不为 nil,说明还有数据没有读取完毕
    if ch1 == nil && ch2 == nil {
      break
    }
  }

  wg.Wait()
}

不能在 select 语句中使用相同的 unbuffer channel 进行读写操作,会出现死锁:all goroutines are asleep - deadlock!

func main() {
  ch := make(chan int)
  select {
  case x := <-ch:
    fmt.Println(x)
    // 处理来自 ch 的数据
  case ch <- 1:
    // 向 ch 中写入数据 y
  }
}

Context

通过 chan 可以实现 goroutine 的监控,在 6s 后退出 cupInfo 的执行

var wg sync.WaitGroup

func cupInfo(ch chan struct{}) {
  defer wg.Done()
  for {
    select {
    case <-ch:
      fmt.Println("退出监控")
      return
    default:
      time.Sleep(2 * time.Second)
      fmt.Println("cpu 的信息")
    }
  }
}

func main() {
  ch := make(chan struct{})
  wg.Add(1)
  go cupInfo(ch)
  time.Sleep(6 * time.Second)
  ch <- struct{}{}
  wg.Wait()
  fmt.Println("监控完成")
}

WithCancel

这样写已经很优雅的,但是这种需求太常见了,所以 go 还给我们提供了 context

context 内部有个 channel,使用 context.WithCancel 声明一个 ctxcancel,将 ctx 传入 cupInfo 函数内部

通过监听 ctx.Done 来退出 cupInfo 的执行,当满足条件后,我们会调用 cancelctx.Done 就能收到信号执行相关的操作

var wg sync.WaitGroup

func cupInfo(ctx context.Context) {
  defer wg.Done()
  for {
    select {
    case <-ctx.Done():
      fmt.Println("退出监控")
      return
    default:
      time.Sleep(2 * time.Second)
      fmt.Println("cpu 的信息")
    }
  }
}

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  wg.Add(1)
  go cupInfo(ctx)
  time.Sleep(6 * time.Second)
  cancel()
  wg.Wait()
  fmt.Println("监控完成")
}

通过 ctx 派生出的 ctx2,你监控的是 ctx2.Done,但是你调用 ctxcancel 也能够取消 ctx2

ctx, cancel := context.WithCancel(context.Background())
ctx2, _ := context.WithCancel(ctx)

select{
case <-ctx2.Done():   // 这边能够接收到 ctx1 的 cancel
  fmt.Println("退出监控")
  return
  default:
    time.Sleep(2 * time.Second)
    fmt.Println("cpu 的信息")
}

cancel()    // ctx1 的 cancel

WithTimeout

一段时间后,自动调用 cancel,退出 cupInfo 的执行

WithTimeout 类似的一个是 WithDeadline 在指定的时间超时

var wg sync.WaitGroup

func cupInfo(ctx context.Context) {
  defer wg.Done()
  for {
    select {
    case <-ctx.Done():    // cancel 后这边能够接受到信号
      fmt.Println("退出监控")
      return
    default:
      time.Sleep(2 * time.Second)
      fmt.Println("cpu 的信息")
    }
  }
}

func main() {
  ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)   // 6s 后自动调用 cancel
  wg.Add(1)
  go cupInfo(ctx)
  wg.Wait()
  fmt.Println("监控完成")
}

WithValue

WithValue 可以传递一些值,比如 traceId,在 cupInfo 中可以通过 ctx.Value("traceId") 获取到

var wg sync.WaitGroup

func cupInfo(ctx context.Context) {
  fmt.Println(ctx.Value("traceId"))     // 这里可以拿到 traceId: 1234
  defer wg.Done()
  for {
    select {
    case <-ctx.Done():    // 还是能够接收到 cancel 信号
      fmt.Println("退出监控")
      return
    default:
      time.Sleep(2 * time.Second)
      fmt.Println("cpu 的信息")
    }
  }
}

func main() {
  ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
  valCtx := context.WithValue(ctx, "traceId", "1234")   // 向 context 中写入 traceId: 1234
  wg.Add(1)
  go cupInfo(valCtx)
  wg.Wait()
  fmt.Println("监控完成")
}