likes
comments
collection
share

并发控制工具无默认实现?Go语言实现

作者站长头像
站长
· 阅读数 38
  1. CountDownLatch:适用于主协程等待所有子协程执行完成的场景。通过等待所有子协程完成后再执行主协程的操作,可以有效地控制并发流程。

  2. CyclicBarrier:适用于多个子协程相互等待的场景,例如子协程之间需要等待彼此都完成某个阶段的任务后再继续执行下一阶段的任务。CyclicBarrier可以让多个协程在栅栏处相互等待,直到所有协程都到达栅栏后一起前进,从而提高并发效率。

  3. Semaphore:适用于限制同时执行的协程数量的场景。通过信号量控制同时执行的协程数量,可以有效地控制并发度,防止资源竞争和过度消耗系统资源。

这三种实现都使用了并发控制工具,可以保证多个协程之间的同步和协作,从而提高程序的并发效率。通过合理地使用这些工具,可以更好地利用多核处理器的性能,实现并发任务的高效执行。

以下是使用Go语言实现减数器(CountDownLatch)的代码示例:

package main

import (
	"fmt"
	"sync"
	"time"
)

// CountDownLatch 用于实现减数器功能
type CountDownLatch struct {
	count int
	mutex sync.Mutex
	cond  *sync.Cond
}

// NewCountDownLatch 创建一个新的减数器,初始值为 count
func NewCountDownLatch(count int) *CountDownLatch {
	latch := &CountDownLatch{
		count: count,
	}
	latch.cond = sync.NewCond(&latch.mutex)
	return latch
}

// Count 返回当前减数器的计数值
func (l *CountDownLatch) Count() int {
	l.mutex.Lock()
	defer l.mutex.Unlock()
	return l.count
}

// Await 等待计数值归零
func (l *CountDownLatch) Await() {
	l.mutex.Lock()
	defer l.mutex.Unlock()
	for l.count > 0 {
		l.cond.Wait()
	}
}

// CountDown 将减数器的计数值减1
func (l *CountDownLatch) CountDown() {
	l.mutex.Lock()
	defer l.mutex.Unlock()
	l.count--
	if l.count == 0 {
		l.cond.Broadcast()
	}
}

func main() {
	latch := NewCountDownLatch(5)

	for i := 0; i < 5; i++ {
		go func(id int) {
			defer latch.CountDown()
			fmt.Printf("Worker %d starting\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Worker %d done\n", id)
		}(i)
	}

	latch.Await()
	fmt.Println("All workers have finished")
}

在这个示例中,我们首先定义了一个CountDownLatch结构体,其中包含一个计数值和一个互斥锁以及一个条件变量。然后通过NewCountDownLatch函数创建一个新的减数器对象,通过Count方法获取当前计数值,通过Await方法等待计数值归零,通过CountDown方法将计数值减一。

main函数中,我们创建了一个减数器对象,并启动了5个子协程执行任务。每个子协程在执行任务结束后调用CountDown方法,当所有子协程执行完成后,主协程调用Await方法等待所有子协程执行完成,然后输出"All workers have finished"。

以下是使用Go语言实现栅栏(CyclicBarrier)的代码示例:

package main

import (
	"fmt"
	"sync"
	"time"
)

// CyclicBarrier 用于实现栅栏功能
type CyclicBarrier struct {
	parties  int
	count    int
	barrier  *sync.Cond
	barrier2 *sync.Cond
}

// NewCyclicBarrier 创建一个新的栅栏,parties 表示需要等待的协程数量
func NewCyclicBarrier(parties int) *CyclicBarrier {
	b := &CyclicBarrier{
		parties: parties,
	}
	b.barrier = sync.NewCond(&sync.Mutex{})
	b.barrier2 = sync.NewCond(&sync.Mutex{})
	return b
}

// Await 等待所有协程到达栅栏
func (b *CyclicBarrier) Await() {
	b.barrier.L.Lock()
	defer b.barrier.L.Unlock()

	b.count++
	if b.count < b.parties {
		b.barrier.Wait()
	} else {
		b.barrier.Broadcast()
		b.barrier2.Broadcast()
		b.count = 0
	}
}

// AwaitAdvance 等待栅栏前进
func (b *CyclicBarrier) AwaitAdvance() {
	b.barrier2.L.Lock()
	defer b.barrier2.L.Unlock()

	b.count++
	if b.count < b.parties {
		b.barrier2.Wait()
	} else {
		b.barrier2.Broadcast()
		b.barrier.Broadcast()
		b.count = 0
	}
}

func main() {
	barrier := NewCyclicBarrier(5)

	for i := 0; i < 5; i++ {
		go func(id int) {
			fmt.Printf("Worker %d starting\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Worker %d done\n", id)
			barrier.Await() // 等待所有协程执行完成
			fmt.Printf("Worker %d continue\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Worker %d done\n", id)
			barrier.AwaitAdvance() // 等待所有协程前进
			fmt.Printf("Worker %d finished\n", id)
		}(i)
	}

	time.Sleep(time.Second * 7) // 等待所有协程执行完成
	fmt.Println("All workers have finished")
}

在这个示例中,我们首先定义了一个CyclicBarrier结构体,其中包含需要等待的协程数量parties、当前已经到达的协程数量count以及两个条件变量barrierbarrier2。通过NewCyclicBarrier函数创建一个新的栅栏对象,通过Await方法等待所有协程到达栅栏,通过AwaitAdvance方法等待所有协程前进。

main函数中,我们创建了一个栅栏对象,并启动了5个子协程执行任务。每个子协程在执行任务结束后调用Await方法等待所有协程执行完成,然后继续执行任务,并调用AwaitAdvance方法等待所有协程前进,最后输出"All workers have finished"。

以下是使用Go语言实现信号量(Semaphore)的代码示例:

package main

import (
	"fmt"
	"sync"
	"time"
)

// Semaphore 信号量实现
type Semaphore struct {
	count int
	cond  *sync.Cond
}

// NewSemaphore 创建一个新的信号量,初始值为 count
func NewSemaphore(count int) *Semaphore {
	sem := &Semaphore{
		count: count,
	}
	sem.cond = sync.NewCond(&sync.Mutex{})
	return sem
}

// Acquire 获取一个信号量
func (s *Semaphore) Acquire() {
	s.cond.L.Lock()
	defer s.cond.L.Unlock()

	for s.count <= 0 {
		s.cond.Wait()
	}
	s.count--
}

// Release 释放一个信号量
func (s *Semaphore) Release() {
	s.cond.L.Lock()
	defer s.cond.L.Unlock()

	s.count++
	s.cond.Signal()
}

func main() {
	sem := NewSemaphore(2) // 创建一个初始值为2的信号量

	for i := 0; i < 5; i++ {
		go func(id int) {
			sem.Acquire() // 获取信号量
			fmt.Printf("Worker %d starting\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Worker %d done\n", id)
			sem.Release() // 释放信号量
		}(i)
	}

	time.Sleep(time.Second * 3) // 等待所有协程执行完成
	fmt.Println("All workers have finished")
}

在这个示例中,我们首先定义了一个Semaphore结构体,其中包含一个计数值count和一个条件变量cond。通过NewSemaphore函数创建一个新的信号量对象,通过Acquire方法获取一个信号量,通过Release方法释放一个信号量。

main函数中,我们创建了一个初始值为2的信号量对象,并启动了5个子协程执行任务。每个子协程在执行任务前先调用Acquire方法获取一个信号量,执行完任务后调用Release方法释放一个信号量。这样可以保证同时只有2个子协程可以执行任务,其他子协程需要等待。

转载自:https://juejin.cn/post/7373507871588089868
评论
请登录