likes
comments
collection
share

并发编程(七) - Map: 并发安全的哈希表

作者站长头像
站长
· 阅读数 9

问题引入

map 是我们最常使用的数据结构,用于快速查找键值对。然而,在 go 中内置的 map 并不是并发安全的。并发写还会导致 panic。

func main() {
    m := map[int]int{}
    wg := sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          m[i] = i
          _ = m[i]
          m[i] = i
       }(i)
    }
    wg.Wait()
} 

// fatal error: concurrent map writes

并发安全的 map

map 加锁

为了保证 map 的并发安全,则需要使用并发原语控制对 map 的读写顺序。对于读多写少的场景使用 RWMutex,否则可以考虑使用 Mutex。下面是一个使用 Mutex 的例子。

type syncMap[K comparable, V any] struct {
    m map[K]V
    sync.Mutex
}

func newMap[K comparable, V any]() *syncMap[K, V] {
    return &syncMap[K, V]{
       m: make(map[K]V),
    }
}

func (m *syncMap[K, V]) get(k K) (V, bool) {
    m.Lock()
    defer m.Unlock()
    v, ok := m.m[k]
    return v, ok
}

func (m *syncMap[K, V]) set(k K, v V) {
    m.Lock()
    defer m.Unlock()
    m.m[k] = v
}

func (m *syncMap[K, V]) del(k K) {
    m.Lock()
    defer m.Unlock()
    delete(m.m, k)
}
func (m *syncMap[K, V]) len(k K) int {
    m.Lock()
    defer m.Unlock()
    return len(m.m)
}

锁优化

使用 Mutex 可以保证 map 的并发安全,在高并发的场景下使用 Mutex 会导致大量抢锁而导致性能下降。常见的锁优化包括减少锁占用的时间,降低锁的粒度。在上面的实现中,已经无法减少锁占用的时间。可以减少锁的粒度,最常见的方式就是使用分片锁。将 map 分为几个片,每个片对应一个 Mutex,从而降低抢同一把锁的 goroutine 数量。github 上有对应的实现。

默认将整个 map 分为 32 个 shardMap,在读写时使用 sharding 函数将 key 映射到对应的 shardMap 上。对于每个 shardMap,读写时使用 Mutex 保证并发安全。这样做可以将锁的粒度降低,由原来的 n 个 goroutine 共同抢锁,变为 n/32 个 goroutine 共同抢锁,从而减少了阻塞的 goroutine 数量。

var SHARD_COUNT = 32

type concurrentMap[K comparable, V any] struct {
    shards   []*concurrentMapShard[K, V]
    sharding func(key K) uint32
}

type concurrentMapShard[K comparable, V any] struct {
    sync.Mutex
    items map[K]V
}

func (m *concurrentMap[K, V]) get(k K) (V, bool) {
    shard := m.getShard(k)
    shard.Lock()
    defer shard.Unlock()
    v, ok := shard.items[k]
    return v, ok
}

func (m *concurrentMap[K, V]) getShard(k K) *concurrentMapShard[K, V] {
    return m.shards[uint32(m.sharding(k)%uint32(SHARD_COUNT))]
}

func (m *concurrentMap[K, V]) set(k K, v V) {
    shard := m.getShard(k)
    shard.Lock()
    defer shard.Unlock()
    shard.items[k] = v
}

func (m *concurrentMap[K, V]) del(k K) {
    shard := m.getShard(k)
    shard.Lock()
    defer shard.Unlock()
    delete(shard.items, k)
}

func nweCurrentMap[K comparable, V any](sharding func(key K) uint32) *concurrentMap[K, V] {
    m := &concurrentMap[K, V]{
       sharding: sharding,
       shards:   make([]*concurrentMapShard[K, V], SHARD_COUNT),
    }
    for i := 0; i < SHARD_COUNT; i++ {
       m.shards[i] = &concurrentMapShard[K, V]{
          items: make(map[K]V),
       }
    }
    return m
}

使用基准测试,看下两者差异。1000 个 goroutine 并发读写下,性能快了 3 倍。

 //goos: darwin
//goarch: arm64
//pkg: demo/c1
//BenchmarkLocks/Map-12           1000000000               0.1587 ns/op
//BenchmarkLocks/ShardMap-12      1000000000               0.05581 ns/op

func BenchmarkMap(b *testing.B) {
    m := newMap[int, int]()
    wg := sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          for j := 0; j < i; j++ {
             m.set(i, i)
             m.get(i)
             m.set(i, i)
          }
       }(i)
    }
    wg.Wait()
}

func BenchmarkShardMap(b *testing.B) {
    m := nweCurrentMap[int, int](func(key int) uint32 {
       return uint32(key % SHARD_COUNT)
    })
    wg := sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          for j := 0; j < i; j++ {
             m.set(i, i)
             m.get(i)
             m.set(i, i)
          }
       }(i)
    }
    wg.Wait()
}

func BenchmarkLocks(b *testing.B) {
    b.Run("Map", BenchmarkMap)
    b.Run("ShardMap", BenchmarkShardMap)
}

官方的 sync.map

在 go 的 sync 包中,实现了一个并发安全的 map。在下面两种场景性能最佳。

  • 对于存储的 key,一次写入,后续均为读。即只有 key 的写入和读取,并没有删除和修改。
  • 多个 goroutine 读写/更新不相交的 key。

使用

sync.map 提供了以下的 api,函数名都比较清晰,调用很方便。

type Map
func (m *Map) CompareAndDelete(key, old any) (deleted bool)
func (m *Map) CompareAndSwap(key, old, new any) bool
func (m *Map) Delete(key any)
func (m *Map) Load(key any) (value any, ok bool)
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)
func (m *Map) Range(f func(key, value any) bool)
func (m *Map) Store(key, value any)
func (m *Map) Swap(key, value any) (previous any, loaded bool)
func main() {
    m := sync.Map{}
    // set
m.Store(1, 1)
    // get
v, ok := m.Load(1)
    fmt.Println(v, ok)
    // 遍历
m.Range(func(key, value any) bool {
       fmt.Printf("key: %d, value: %d\n", key, value)
       return true
    })
    // del
m.Delete(1)
}

实现

sync.map 的实现也比较有意思。在上节中,我们说到为了尽可能减少并发场景下带来的锁开销,可以使用减少锁持有的时间和降低锁粒度的方式解决。相较于 Mutex, 使用 RWMutex 可以保证读场景下可以同时获取资源「降低锁的粒度?」。如何针对并发场景下的 map 这一数据结构能够尽可能不用锁呢?

在对于内置的 map,出于性能考虑没有进行原子访问控制,并发写的情况下会直接 panic。但是并发读是没有任何问题的。因此,在实现 sync.map 时可以定义两个 map。 readMap:使用原子操作处理读请求,同时会尝试使用 cas 处理一部分更新操作,dirtyMap: 用于处理插入和删除操作,需要持有 mutext。最后一个问题就是保证两个 map 的一致性。下面看一下 sync.map 的实现:

type Map struct {
    // 互斥锁
    mu Mutex
    // 所有的操作经过 read, 由于使用原子操作,可以保证并发安全
read atomic.Pointer[readOnly]
    // 对于 read 无法处理的场景,为了保证并发安全需要先持有 mu, 然后操作 ditry
dirty map[any]*entry
    // 将 dirty 升级为 read
misses int
}

// 不可变结构体,使用原子操作 store 到 Map.read 中
type readOnly struct {
    m       map[any]*entry
    amended bool // 如果 dirty 包含了 read 不包含的 key 则为 true
}

type entry struct {
p atomic.Pointer[any]
}

可以看到 Map 的结构体设计的比较巧妙,首先 read 是一个可以原子操作的指针,这就保证了 dirty 升级为 read 是安全的。其次对于 read 和 dirty 并没有存储具体的值,而是存储的值的指针,保证值的操作都是原子的,且 read 和 ditry 看到的都是同一个 value。基于这种设计,写场景:如果 key 存在 read 中,则可以直接通过 CAS 操作更新,如果不存在,则需要加锁处理 dirty。读场景:如果 read 中存在,可以直接返回,如果不存在,则需要判断是否需要加锁走 dirty。可以看到整个过程中,都是优先使用原子操作,最后在使用 mutex。

Store
func (m *Map) Store(key, value any) {
    _, _ = m.Swap(key, value)
}

func (m *Map) Swap(key, value any) (previous any, loaded bool) {
    // 原子 load
    read := m.loadReadOnly()
    if e, ok := read.m[key]; ok {
       // key 存在 read 中,直接使用 cas
       if v, ok := e.trySwap(&value); ok {
          if v == nil {
             return nil, false
          }
          return *v, true
       }
    }
    // key 不在 read 中,或者 key 被删除,加锁
    m.mu.Lock()
    // 二次读,防止 ditry 升级为 read
    read = m.loadReadOnly()
    if e, ok := read.m[key]; ok {
       if e.unexpungeLocked() {
          // The entry was previously expunged, which implies that there is a
 // non-nil dirty map and this entry is not in it.
m.dirty[key] = e
       }
       if v := e.swapLocked(&value); v != nil {
          loaded = true
          previous = *v
       }
    } else if e, ok := m.dirty[key]; ok { // ditry 中已经存在,执行 cas
       if v := e.swapLocked(&value); v != nil {
          loaded = true
          previous = *v
       }
    } else {
       if !read.amended { // 标记 ditry 和 read 是否一致
          // We're adding the first new key to the dirty map.
 // Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
          m.read.Store(&readOnly{m: read.m, amended: true})
       }
       m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
    return previous, loaded
}
Load
func (m *Map) Load(key any) (value any, ok bool) {
    read := m.loadReadOnly()
    e, ok := read.m[key]
    if !ok && read.amended {
       m.mu.Lock()
read = m.loadReadOnly()
       e, ok = read.m[key]
       if !ok && read.amended {
          e, ok = m.dirty[key]
          // 是否要将 dirty 转为 read
m.missLocked()
       }
       m.mu.Unlock()
    }
    if !ok {
       return nil, false
    }
    return e.load()
}
Delete
func (m *Map) Delete(key any) {
    m.LoadAndDelete(key)
}

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
    read := m.loadReadOnly()
    e, ok := read.m[key]
    if !ok && read.amended {
       m.mu.Lock()
       read = m.loadReadOnly()
       e, ok = read.m[key]
       if !ok && read.amended {
          e, ok = m.dirty[key]
          delete(m.dirty, key)
          // 是否要将 dirty 转为 read
m.missLocked()
       }
       m.mu.Unlock()
    }
    if ok {
       return e.delete()
    }
    return nil, false
}

性能

使用基准测试,比较一下三种 map 的性能。可以看到,在 1000 goroutine 下并发读写 map 时,sync.map 性能是最好的。当然,具体的性能其实取决于使用场景。

 //BenchmarkLocks/Map-12           1000000000               0.1601 ns/op
//BenchmarkLocks/ShardMap-12      1000000000               0.05068 ns/op
//BenchmarkLocks/SyncMap-12       1000000000               0.02733 ns/op
func BenchmarkSyncMap(b *testing.B) {
    cm := sync.Map{}
    wg := sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          for j := 0; j < i; j++ {
             cm.Store(i, i)
             _, _ = cm.Load(i)
             cm.Store(i, i)
             cm.Delete(i)
          }
       }(i)
    }
    wg.Wait()
}

func BenchmarkLocks(b *testing.B) {
    b.Run("Map", BenchmarkMap)
    b.Run("ShardMap", BenchmarkShardMap)
    b.Run("SyncMap", BenchmarkSyncMap)
}

总结

本文介绍了并发安全 map 的三种实现方式。针对这三种实现方式并没有绝对的好,绝对的坏。我们需要根据自己的业务场景进行选型。当然,大家在使用的时候最的场景还是写少读多的场景。那就可以考虑在 shardMap 或者 sync.map。可以基于自己的场景,写基准测试对比一下两者的性能。通过他们的实现方式,可以大概清楚对写入操作少,读操作特别多,以及不同的 goroutine 之间处理的 key 是正交的场景可以优先考虑 sync.map。