likes
comments
collection
share

5. 精品:golang map 源码的逐句解读

作者站长头像
站长
· 阅读数 96

前言

注:本文依据 go 版本 1.20.7 源码进行讲解,源码位置:src/runtime/map.go、 代码中保留了部分官方英文注释,并对每一行代码进行了详细注释;特别的地方会重点进行强调和讲解。

1.map 底层数据结构

map 类型的内部实现相对比较复杂,Go 运行时使用一张哈希表来实现抽象的 map 类型。其中最核心的结构体为 runtime.hmap,hamp 中 buckets unsafe.Pointer指针最终指向了 bmap 结构体,该结构体用于存储真正的key,value ,本小节将重点介绍这两个重要结构体内的参数作用,是理解后续源码的关键。

我们先通过一张图,从整体上了解一下结构体之间的指向关系,然后对其进行详细的分析与讲解:

5. 精品:golang map 源码的逐句解读

1.1 hmap 数据结构

从图中我们可以看到,hmap 结构体用于表示 map(它是 hashmap 的缩写),hmap 类型是 map 类型的头部结构(header),它存储了后续 map 类型操作所需的所有信息。下边展示了源码中 hmap 的详细结构。

// A header for a Go map.
type hmap struct {
	// Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
	// Make sure this stays in sync with the compiler's definition.
    
	count     int // map 的元素数量。调用 len(map) 的时候返回此值
	flags     uint8 // map 当前状态的标志位,目前定义了 4 中状态:iterator/oldIterator/hashWriting/sameSizeGrow
	B         uint8  // 当前哈希表持有的 buckets(存储桶) 的数量(2^B 是 bucket 的数量,bucket 数组的长度)
	noverflow uint16 // 溢出桶的大致数量,溢出桶较少时为精确值
	hash0     uint32 // 哈希种子,计算 key 的哈希的时候会传入哈希函数

	buckets    unsafe.Pointer // 指针指向 2^B Buckets 数组。当 count==0 时,可能为 nil
    // 仅仅在扩容期间不为 nil,非等量扩容期间为 buckets 的一半大小,等量扩容与 buckets 等大
	oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
	
    nevacuate  uintptr    // 迁移进度计数器,小于此下标的 buckets 都已迁移完成    // progress counter for evacuation (buckets less than this have been evacuated)

	extra *mapextra // optional fields 可选字段,用于防止 k-v 不是指针情况下,溢出桶被 GC
}

const(
    // flags
    // map 迭代器允许并行,在迭代过程中,有可能使用 buckets 或 oldbuckets
	iterator     = 1 // 有迭代器可能正在使用 buckets;  there may be an iterator using buckets
	oldIterator  = 2 // 有迭代器可能正在使用 oldbuckets; there may be an iterator using oldbuckets
	hashWriting  = 4 // 有协程在写 map,用于限制并发读写;a goroutine is writing to the map
	sameSizeGrow = 8 // 等量扩容;the current map growth is to a new map of the same size
)

// mapextra holds fields that are not present on all maps.
type mapextra struct {
    // 源码注释
	// If both key and elem do not contain pointers and are inline, then we mark bucket
	// type as containing no pointers. This avoids scanning such maps.
	// However, bmap.overflow is a pointer. In order to keep overflow buckets
	// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
	// overflow and oldoverflow are only used if key and elem do not contain pointers.
	// overflow contains overflow buckets for hmap.buckets.
	// oldoverflow contains overflow buckets for hmap.oldbuckets.
	// The indirection allows to store a pointer to the slice in hiter.
    
	overflow    *[]*bmap // 包含 hmap.buckets 的溢出桶
	oldoverflow *[]*bmap // 包含 hmap.oldbuckets 的溢出桶

	// nextOverflow holds a pointer to a free overflow bucket.
    // nextOverflow 持有指向空闲溢出桶的指针(map 初始化的时候,预分配出来的,用于提升 map 后续创建溢出桶的速度)。
	nextOverflow *bmap
}
  • flags 为 hmap 的标志位,共有四位 1111 表示了 map 的四种状态
    • iterator:有迭代器可能正在使用 buckets
    • oldIterator:有迭代器可能正在使用 oldbuckets;
    • hashWriting:有协程在写 map,用于限制并发读写
    • sameSizeGrow:map 正在进行等量扩容
  • mapextra 字段

mapextra 字段设计思路很有意思,这里值得详细介绍一下,后续源码解析中也有讲解。 首先你得提前了解一下 golang 的 GC 机制,方便理解该字段的内容。垃圾回收机制会回收那些没有被引用到可回收的内存。

当 GC 扫描到 hmap 结构体的变量时,不仅需要扫描 hmap 内部的字段,还需要扫描整个 buckets 数组,当 map 存储大量 k-v 时,将会耗费大量的性能用于 GC 扫描,影响性能。这个时候设计 map 的工程师就在想了,如果 map 存储的 key、value 都不包含指针,能不能避免 GC 扫描 buckets 数组。

来看一下官方对 mapextra 字段的注释内容:

If both key and elem do not contain pointers and are inline, then we mark bucket type as containing no pointers. This avoids scanning such maps. However, bmap.overflow is a pointer. In order to keep overflow buckets alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow. overflow and oldoverflow are only used if key and elem do not contain pointers. overflow contains overflow buckets for hmap.buckets. oldoverflow contains overflow buckets for hmap.oldbuckets. The indirection allows to store a pointer to the slice in hiter.

这里先对官方注释做一个翻译:

如果 key 和 eiem 都不包含指针,会把 bucket 的存储类型标记为不含指针,这将避免扫描这样的 maps。 然而 bmap.overflow 是一个指针。 为了保持溢出桶存活,我们在 hmap.extra.overflow 和 hmap.extra.oldoverflow 变量中存储了指针指向所有的溢出桶。 overflow 和 oldoverflow 字段仅仅只用在 k-v 不包含指针的情况下。overflow 字段包含了 hmap.buckets 的溢出桶。oldoverflow 字段包含了 hmap.oldbuckets 的溢出桶。 间接允许在 hiter 中存储指向切片的指针。

如果没有读过 map 源码,一定不知道官方注释在说什么,这里对官方注释内容做一个讲解:

第一句:如果 key 和 value 都不包含指针,会把 bucket 的存储类型标记为不含指针,这将避免扫描这样的 maps。

讲解:如果 map 的 key 和 value 都不包含指针,会把 map 的存储类型 maptype 标记为不含指针(使用 maptype.bucket.ptrdata == 0 进行判断,后续会有源码解读 ptrdata 字段)。这将避免扫描整个 map(触发扫描的过程就是 GC)。

第二句:然而 bmap.overflow 是一个指针。

讲解:然而 bmap.overflow 字段是指针,这将破坏 bmap 中没有指针这一设想(虽然 k-v 不是指针,但 overflow 还是指针,无法避免 GC 扫描 bmap)。

第三句:为了保持溢出桶存活,我们在 hmap.extra.overflow 和 hmap.extra.oldoverflow 变量中存储了指针指向所有的溢出桶。

讲解:为了保持溢出桶的活跃状态(也就是不被 GC 认为可回收),我们在 hmap.extra.overflow 和 hmap.extra.oldoverflow 变量中存储了指针指向所有的溢出桶。从这句注释,我们大概知道了 extra 字段的作用是为了保活溢出桶而存在的,到这里 extra 中字段的意思解释清楚了,官方注释也戛然而止了,我相信你们和我产生了同样的疑问:你第二句没告诉我咋解决 bmap.overflow 是指针的问题啊!!!

很明显第二句和第三句注释之间是断层的,隐藏了一些东西,官方没有直接说出来,这里把缺失的部分补充回来:bmap.overflow 指针字段该如何解决?

首先得理解一下,为什么 bmap.overflow 字段要用指针类型?我们都知道,map 溢出桶是通过链表结构维护的,需要指针指向下一个溢出桶,所以用指针很正常,且指针可以用来对溢出桶保活。

那有办法不用指针类型吗? 当然可以,golang 在编译期间会把 k-v 不是指针的 map 中的 bmap.overflow 字段优化为 unitptr 类型。uintptr 是数值类型,非指针类型,用这个存储指针是无法保护对象的(扫描的时候 uintptr 指向的对象不会被扫描,意味着会被 GC)。

这样就会带来一个新问题,在使用 map 的过程中,溢出桶链表会被 GC 回收掉,这是不能接受的,此时 hmap.extra 的作用就彰显出来了。

第四句:overflow 和 oldoverflow 字段仅仅只用在 k-v 不包含指针的情况下。overflow 字段包含了 hmap.buckets 的溢出桶。oldoverflow 字段包含了 hmap.oldbuckets 的溢出桶。

讲解:从第三句+第四句注释,我们可以知道溢出桶的保活任务交给了 hmap.extra.overflow 和 hmap.extra.oldoverflow 两个 slice 变量;overflow 包含了 hmap.buckets 的所有溢出桶,oldoverflow 包含了 hmap.oldbuckets 的所有溢出桶,这样扩容的时候,也可以进行保活。

最后一句:间接允许在 hiter 中存储指向切片的指针。

讲解:hiter 是 map 迭代器的数据结构,在遍历 map 的过程中,会使用 hiter 迭代器;此时迭代器保存了 map 中的快照数据,如果 hiter 指向了oldbuckets ,当 hmap 把 oldbuckets 迁移完毕时,会把字段 h.oldbuckets = nil,有迭代器存在时,虽然不清理 oldbuckets 的内存,但溢出桶却失去了保活的变量;而在 hiter 遍历 oldbuckets 过程中也是不允许溢出桶被 GC 回收的,所以迭代器需要对溢出桶进行保活,也就是官方的这句注释的解释了,保活的字段为 hiter.overflow 和 hiter.oldoverflow。

在 Go 中,这种避免 GC 回收的保活手段在源码里是经常被使用的,对理解源码还是十分必要的。

1.2 bmap 数据结构

buckets 数组中存储是一个指针,最终它指向的是一个结构体:bmap。

// A bucket for a Go map.
type bmap struct {
	// tophash generally contains the top byte of the hash value
	// for each key in this bucket. If tophash[0] < minTopHash,
	// tophash[0] is a bucket evacuation state instead.
	tophash [bucketCnt]uint8
	// Followed by bucketCnt keys and then bucketCnt elems.
	// NOTE: packing all the keys together and then all the elems together makes the
	// code a bit more complicated than alternating key/elem/key/elem/... but it allows
	// us to eliminate padding which would be needed for, e.g., map[int64]int8.
	// Followed by an overflow pointer.
}

通过官方注释也可以知道,bmap 不止包含 tophash 这一个字段,其实在编译期间会给它加料,动态的创建一个新的结构,如下:

type bmap struct {
    // tophash 通常包含此 bucket 中每个 key 的哈希值的最高的 8 位(1 个字节)
    // 如果 tophash[0] < minTopHash,则 tophash[0] 为桶已迁移状态。
    tophash [bucketCnt]uint8 
    keys     [bucketCnt]keytype // 8个key
    values   [bucketCnt]valuetype // 8个value
    padding  uintptr // 填充字段,用于内存对齐,经常会用到,刚好一个字节大小
    overflow uintptr // 指向溢出桶链表 overflow 类型由编译器根据情况而定
}

// tophash 的标志位
const (
    emptyRest = 0 // 这个 cell 是空的,并且在更高的索引或溢出处没有更多的非空 cell。  
    emptyOne = 1 // 这个 cell 是空的
    evacuatedX = 2 // key/elem 有效。 entry 已被迁移到较大的哈希表的前半部分(扩容了)。
    evacuatedY = 3 // 同上,但迁移到大的哈希表的后半部分(扩容了)。
    evacuatedEmpty = 4 // cell 是空的,bucket 已经被迁移了
    minTopHash = 5 // 正常填充 cell 的最小 tophash。
)

// bucketCnt = 8,一个 bucket 可以存储 8 个 k-v 
const(
    // Maximum number of key/elem pairs a bucket can hold.
	bucketCntBits = 3
	bucketCnt     = 1 << bucketCntBits
)

bmap 是存放 k-v 的地方,也被成为 bucket(桶),我们具体看下 bucket 的内部组成。

5. 精品:golang map 源码的逐句解读

上图就是 bucket 的内存模型,从上到下分别为:tophashs 区域、keys 区域、values 区域 和 overflow,我们先简单分析一下它们存储的内容:

  • tophash 区域

因为一个 bucket 只能存储 8 个元素,所以 tophashs 区域是一个长度为 8 的数组。tophash 通常包含此 bucket 中每个 key 的哈希值的最高的 8 位(刚好是 1 个字节),如果 tophash[0] < minTopHash,则 tophash[0] 含义转变为桶已迁移状态。因为 tophash 可能有不同的含义冲突,所以需要在 key 的哈希值的最高的 8 位 < minTopHash 时,为其 tophash + minTopHash 避免冲突(后续有详细代码分析)。

  • key、value 区域

因为 key 和 value 总是成对出现,这里就一起分析了,通过命名想必大家也能看出来,key - value 就是 map 中存储的键值对。仔细观察图我们可以注意到 key 和 value 是各自放在一起的,并不是以 key/value/key/value/... 这样的形式存储的。源码里说明这样的好处是在某些情况下可以省略掉 padding 字段,节省内存空间。

这里稍微解释一下如何节省内存空间的,如果按 key/value/key/value/... 形式存储,由于 key 和 value 类型可能不一样,例如:map[int64]int8,则需要在每个 value 后边补足额外的 7 个 padding 字节,对齐 64 位(64位计算机);而如果按 key/key/... /value/value/... 形式存储,则只需要在 value 最后增加 padding 对齐内存即可,可以节省很多内存空间。

  • overflow

每个 bucket 中存储的是 Hash 值低 hmap.B 位数值相同的元素(和取余类似,这里用的是二进制相与操作),也就是说发生哈希碰撞的元素会被分配到一个 bucket 进行存储,当某个 bucket 的 8 个空槽 slot/cell 都填满了,且 map 尚未达到扩容条件的情况下,运行时会建立 overflow bucket(溢出桶),并将这个 overflow bucket 挂在上面 bucket 末尾的 overflow 指针上,这样两个 bucket 形成了一个链表结构,用拉链的方式,解决哈希碰撞的问题。

2.map 的函数调用和类型字段

2.1 map 操作的函数调用

运行时实现了 map 类型操作的所有功能,包括创建、查找、插入、删除、遍历等。在编译阶段,Go 编译器会将 Go 语法层面的 map 操作,重写成运行时对应的函数调用。大致的对应关系是这样的:

// 创建map类型变量实例
m := make(map[keyType]valType, hint) → m := runtime.makemap(maptype, hint, h)

// 获取某键的值 
v := m["key"]      → v := runtime.mapaccess1(maptype, m, "key")
v, ok := m["key"]  → v, ok := runtime.mapaccess2(maptype, m, "key")

// 删除某键
delete(m, "key")   → runtime.mapdelete(maptype, m, “key”)

// 插入新键值对或给键重新赋值,v是用于后续存储value的空间的地址
m["key"] = "value" → v := runtime.mapassign(maptype, m, "key") 

// 遍历 map
for k,v := range m{} 
// 初始化 map 迭代器,后续操作以迭代器 hiter 为准
mapiterinit(t *maptype, h *hmap, it *hiter)
// 每次迭代会调用 mapiternext(it *hiter) 函数,返回下一个 key 和 value
mapiternext(it *hiter)

// for range map 编译器源码注释
// The loop we generate:
// var hiter map_iteration_struct
// for mapiterinit(type, range, &hiter); hiter.key != nil; mapiternext(&hiter) {
//     index_temp = *hiter.key
//     value_temp = *hiter.val
//     index = index_temp
//     value = value_temp
//     original body
// }

2.2 map 类型 maptype

通过观察我们可以发现,这些运行时函数都有一个共同的特点,那就是第一个参数都是 maptype 指针类型的参数。当我们声明一个 map 类型变量,比如 var m map[string]int 时,Go 编译期间就会为这个变量生成对应的特定 map 类型,生成一个 runtime.maptype 实例。 maptype 实例的结构如下:

type maptype struct {
	typ    _type
	key    *_type // key 类型
	elem   *_type // elem(value) 类型
	bucket *_type // 表示哈希桶的内部类型
	// function for hashing keys (ptr to key, seed) -> hash
    // 哈希函数,用于计算 key 的哈希值
	hasher     func(unsafe.Pointer, uintptr) uintptr
	keysize    uint8  // key 大小
	elemsize   uint8  // value(elem) 大小
	bucketsize uint16 // bucket 大小
	flags      uint32 // map k-v 标志位,比如 key 是否存储的是指针
}

type _type struct {
	size       uintptr
	ptrdata    uintptr //  保存所有指针的内存前缀的大小 size of memory prefix holding all pointers
	hash       uint32
	tflag      tflag
	align      uint8
	fieldAlign uint8
	kind       uint8
	// function for comparing objects of this type
	// (ptr to object A, ptr to object B) -> ==?
	equal func(unsafe.Pointer, unsafe.Pointer) bool
	// gcdata stores the GC type data for the garbage collector.
	// If the KindGCProg bit is set in kind, gcdata is a GC program.
	// Otherwise it is a ptrmask bitmap. See mbitmap.go for details.
	gcdata    *byte
	str       nameOff
	ptrToThis typeOff
}

这个实例包含了我们需要的 map 类型中的所有"元信息"。Go 运行时就是利用 maptype 参数中的信息确定 key、value 的类型和大小的。maptype 的存在让 Go 中所有 map 类型都共享一套运行时 map 操作函数,而不是像 C++ 那样为每种 map 类型创建一套 map 操作函数,这样就节省了对最终二进制文件空间的占用。

这里简单解释一下 flags 的作用。由于 map 对 key、value 的数据长度有长度限制,如果 key 或 value 的数据长度大于一定数值,那么运行时不会在 bucket 中直接存储数据,而是会存储 key 或 value 数据的指针。目前 Go 运行时定义的最大 key 和 value 的长度是这样的:

const (
    maxKeySize  = 128
    maxElemSize = 128
)

因此,需要字段标记 key,value 存储的是不是指针,就是 flags 字段,flags 字段还标记了其他值,后续代码里用到会具体进行讲解。

func (mt *maptype) indirectkey() bool { // store ptr to key instead of key itself
	return mt.flags&1 != 0
}
func (mt *maptype) indirectelem() bool { // store ptr to elem instead of elem itself
	return mt.flags&2 != 0
}

3.map 创建

创建 map 对应的函数调用为 makemap,下边对其源码进行分析:

// 创建map类型变量实例
m := make(map[keyType]valType, hint) → m := runtime.makemap(maptype, hint, h)

3.1 makemap 函数创建 map

// makemap implements Go map creation for make(map[k]v, hint).
// If the compiler has determined that the map or the first bucket
// can be created on the stack, h and/or bucket may be non-nil.
// If h != nil, the map can be created directly in h.
// If h.buckets != nil, bucket pointed to can be used as the first bucket.
// makemap 是 make(map[k]v, hint) 的实现,创建一个 map。
func makemap(t *maptype, hint int, h *hmap) *hmap {
	// 计算 bucket 所需内存大小,是否溢出,如果溢出或超过最大内存,分配 hint = 0
	mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
	if overflow || mem > maxAlloc {
		hint = 0
	}

	// initialize Hmap
	// 分配 hamp 结构体所占内存
	if h == nil {
		h = new(hmap)
	}
	// 生成随机哈希种子
	h.hash0 = fastrand()

	// Find the size parameter B which will hold the requested # of elements.
	// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
	// 根据传入的 hint 和 负载因子,计算出需要的最小桶数量。
	B := uint8(0)
	for overLoadFactor(hint, B) {
		B++
	}
	h.B = B

	// allocate initial hash table 分配最初的哈希表内存
	// if B == 0, the buckets field is allocated lazily later (in mapassign) B == 0 时,延迟分配
	// If hint is large zeroing this memory could take a while. hint 比较大时,耗时比较长
	if h.B != 0 {
		var nextOverflow *bmap
		// 初始化 buckets(分配 buckets 所需要的内存),可能会提前分配一些空闲的溢出桶,以备后续使用,提升速度
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
        // 如果预分配了空闲的溢出桶数组,则初始化 mapextra 字段,用 h.extra.nextOverflow 指向第一个溢出桶
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

	return h
}

// overLoadFactor 放置在 2^B 个桶中的键值对数量是否超过负载因子。
func overLoadFactor(count int, B uint8) bool {
	// 第一步:判断一个桶能否装下,可以装下 B 就不用算了
    // 第二步:count 是否大于 桶数量(2^B) * 负载因子(6.5),如果比这都大,说明现在的 B 负载不了,还需要 ++
    // bucketShift(B) = 2^B
    // loadFactorNum / loadFactorDen = 6.5
   return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

3.2 makeBucketArray 初始化 map buckets 底层数组

makemap 函数中当 h.B != 0 时,makeBucketArray 函数会初始化 map buckets 的底层数组,还可能会预分配空闲溢出桶的内存。

// makeBucketArray initializes a backing array for map buckets.
// 1<<b is the minimum number of buckets to allocate.
// dirtyalloc should either be nil or a bucket array previously
// allocated by makeBucketArray with the same t and b parameters.
// If dirtyalloc is nil a new backing array will be alloced and
// otherwise dirtyalloc will be cleared and reused as backing array.
// makeBucketArray 初始化 map buckets 的底层数组
// 最小存储桶数为 1<<b,也就是 2^b
// dirtyalloc 应该是 nil 或之前由 makeBucketArray 使用相同的 t 和 b 参数分配的存储桶数组。
// 如果 dirtyalloc 为 nil,将分配一段新的内存;否则将清除 dirtyalloc 指向的内存,将其作为新分配的内存。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	// nbuckets = base = 2^b
	base := bucketShift(b)
	nbuckets := base
	// For small b, overflow buckets are unlikely.
	// Avoid the overhead of the calculation.
	// 对于 b 很小的情况,溢出桶的可能性不大(由于数据较少,哈希冲突少),避免计算开销(减少溢出桶的预分配)
	// 当 b >= 4 认为溢出桶的使用概率变大,可以预先分配一下溢出桶的内存,提升后续使用 map 的性能
	if b >= 4 {
		// Add on the estimated number of overflow buckets
		// required to insert the median number of elements
		// used with this value of b.
		// 总存储桶数量 = 普通存储桶 + 溢出桶数量 (2^(b-4))
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets // 存储桶所需内存
		up := roundupsize(sz) // 内存对齐后所占内存
		if up != sz {
			// 根据新内存重新计算存储桶数量
			nbuckets = up / t.bucket.size
		}
	}

	if dirtyalloc == nil {
		// 创建 t.bucket 类型新数组
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		// dirtyalloc was previously generated by
		// the above newarray(t.bucket, int(nbuckets))
		// but may not be empty.
		// 清空原有数组内存,作为该 map 的数组使用
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		if t.bucket.ptrdata != 0 {
			// bucket 中有指针,用有指针的清理函数
			memclrHasPointers(buckets, size)
		} else {
			// bucket 中没有指针,用没有指针的清理函数
			memclrNoHeapPointers(buckets, size)
		}
	}

	// 有预分配的溢出桶情况
	if base != nbuckets {
		// We preallocated some overflow buckets.
		// To keep the overhead of tracking these overflow buckets to a minimum,
		// we use the convention that if a preallocated overflow bucket's overflow
		// pointer is nil, then there are more available by bumping the pointer.
		// We need a safe non-nil pointer for the last overflow bucket; just use buckets.
		// nextOverflow 指向溢出桶的开始地址(溢出桶地址在普通存储桶后边)
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		// 获取最后的一个溢出桶
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		// 最后一个溢出桶的 overflow 指针链接到第一个普通桶
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

通过阅读 makeBucketArray 函数源码,我们可以发现预分配的溢出桶数组的尾部的溢出桶有一点点特殊,它的 overflow 字段并不是 nil(其余溢出桶都是 nil),这样一来,我们通过判断溢出桶的 overflow 是否为 nil 就可以知道是否是最后一个空闲溢出桶。如果是最后一个空闲溢出桶,那么将 map 里面的 extra.nextOverflow 字段设置为 nil,表示预分配的空闲溢出桶用完了,后面如果再需要溢出桶的时候,就只能直接 new 一个了。

4.map 读取

读取 map 一般有两种形式,分别对应两个调用函数:

// 获取某键的值 
v := m["key"]      → v := runtime.mapaccess1(maptype, m, "key")
v, ok := m["key"]  → v, ok := runtime.mapaccess2(maptype, m, "key")

为了更好地理解源码的内容,我们先来学习一下在 map 中如何定位一个 key。

4.1 定位 key 的方式

当向 map 插入一条数据,或者是从 map 按 key 查询数据的时候,运行时都会使用哈希函数对 key 做哈希运算,并获得一个哈希值(hashcode,在 64 位机器上为 64 个 bit 位)。这个 hashcode 是定位 key 的关键,运行时会从 hashcode 中取得高 8 位和 低 B(hmap.B) 位 ,其中低位区的值用于选定 bucket,高位区的值用于在某个 bucket 中确定 key 的位置(也就是 tophash 的值)。我把这一过程整理成了下面这张示意图,你理解起来可以更直观:

5. 精品:golang map 源码的逐句解读

每个 bucket 的 tophash 区域其实是用来快速定位 key 位置的,这样就避免了逐个 key 进行比较这种代价较大的操作。尤其是当 key 是 size 较大的字符串类型时,好处就更突出了。这是一种以空间换时间的思路,避免的是 key 本身的内容过大导致的慢,而不是把遍历 tophash 这个过程砍掉。换句话说:比较 8 个 bit 的 tophash 肯定是比比较字符串更快的。

当然,后续我们分析源码也会看到,比较 key 是否相等这一步骤是绕不过去的,因为哈希冲突的存在,低 B 位使得两个 key 落在了一个桶里,还会有更小的概率,这俩 key 哈希值的 tophash 也相等,此时只能通过 key 是否相等来进行区分(map 中 key 是唯一的);同理,找到了哈希值低 B 位和高8位相等的槽,并不能确定的说这就是我们要找的槽,还是需要比较 key 是否相等,做最终的判断。因此,tophash 只是对比较 key 的次数进行了优化,绕不过比较 key 是否相等这一层。

4.2 mapaccess1 源码解析

mapaccess1 和 mapaccess2 函数实现基本一致,只是返回值不同,这里我们以 mapaccess1 为例子进行分析,查找的关键有两点:

  1. key 的定位问题,上一小节已经详细讲述。
  2. 在扩容过程中查找,需要注意 bucket 是否已经迁移,未进行迁移的桶,需要在旧桶中查找。
// mapaccess1 返回一个指向 h[key] 的指针。 永远不会返回 nil,
// 如果键不在映射中,它将返回对 elem 类型的零对象的引用。
// NOTE: The returned pointer may keep the whole map live, so don't
// hold onto it for very long.(因为返回的是指针,会导致整个 map 不能被GC)
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	// ...

    // 如果 h 什么都没有,返回零值
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return unsafe.Pointer(&zeroVal[0])
	}

    // 并发读和写冲突(hashWriting 写 map 时标志位为 1)
    // 所以 map 并发读是可以的,并发读写会 panic
	if h.flags&hashWriting != 0 {
		fatal("concurrent map read and map write")
	}

    // 不同类型 key 使用的 hash 算法在编译期确定,t *maptype 参数确定哈希函数
    // 计算哈希值,并且加入 hash0 引入随机性
	hash := t.hasher(key, uintptr(h.hash0))

    // 比如 B=5,那 m 就是31,二进制是全 1
	// 定位 bucket num 时,将 hash 与 m 相与,
	// 最终由 hash 的低 8 位决定 key 在哪个 bucket 中
    // 和子网掩码作用类似
	m := bucketMask(h.B)

    // b 就是具体 bucket 的首地址(h.buckets 首地址 + 偏移量)
	b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))

    // 如果 map 正经历扩容,需要重新定位,需要从旧桶中读取
    if c := h.oldbuckets; c != nil {
        // 不是等量扩容,需要重新计算 m
		if !h.sameSizeGrow() {
			// 不是等量扩容,则将 m 除以 2,因为是 2 倍扩容,
          	// 所以 buckets 的大小为 oldbuckets 长度的 2 倍,
         	// 除以 2 才是旧的桶数量
			m >>= 1
		}
        // key 在 oldbuckets 中所在 bucket 的首地址
		oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        // 如果该 bucket 尚未迁移到新的 buckets 中
		if !evacuated(oldb) {
            // b 尚未迁移到新的 buckets 中,还在 oldbuckets 中
         	// 则需要从旧桶中查找
			b = oldb
		}
	}
    // 计算高 8 位,用于定位 key 具体位置(下边有具体实现)
	top := tophash(hash)
bucketloop:
    // 遍历溢出桶所有的 bucket(这相当于是一个 bucket 链表)。
    // 第一次遍历的是普通桶
	for ; b != nil; b = b.overflow(t) {
        // 循环遍历 bucketCnt = 8 个元素(一个 bucket 存储 8 个 key)
		for i := uintptr(0); i < bucketCnt; i++ {
            // tophash 不匹配不是这个槽,continue,继续遍历下一个 cell
			if b.tophash[i] != top {
                // emptyRest 为 tophash 标志位:
                // 这个 cell 是空的,并且在更高的索引或溢出处没有更多的非空 cell(bucket 后边没 key了)。
                // 如果等于这个标志位,后续就不需要再进行遍历了
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}

            // tophash 找到了,但有极小可能冲突,需要继续判断 key 是否相等
            // k 定位到 key 的首地址
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // 如果 key 是指针
			if t.indirectkey() {
                // 解引用,找到真正的 key 值
				k = *((*unsafe.Pointer)(k))
			}
            // 比较 key 是否相等
			if t.key.equal(key, k) {
                // e:value 首地址
				e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				// value 为指针
                if t.indirectelem() {
                    // 解引用
					e = *((*unsafe.Pointer)(e))
				}
				return e
			}

            // tophash 相等,但 key 不等,继续遍历下一次 cell
		}
	}
    // 如果键不在映射中,它将返回对 elem 类型的零对象的引用。
	return unsafe.Pointer(&zeroVal[0])
}

// 判断该 bucket 是否迁移完毕
func evacuated(b *bmap) bool {
	h := b.tophash[0]
	return h > emptyOne && h < minTopHash
}

// tophash 计算 hash 的 tophash 值。
// 这是一个字节的大小的。(hash 最高的 8 位)
func tophash(hash uintptr) uint8 {
   // top 本质上就是 hash 的前面 8 个字节
   // goarch.PtrSize*8 - 8,左移位数:指针的字节大小 - 8 字节
   top := uint8(hash >> (goarch.PtrSize*8 - 8))
   // 为了跟正常的 tophash 区分开来,如果计算出来的 tophash 小于 minTopHash,
   // 会将计算出来的 tophash 加上 minTopHash: 
   // 这样一来,通过 tophash 这一个字节就可以记录桶里面槽的状态了,非常节省空间。 
   if top < minTopHash {
      top += minTopHash
   }
   return top
}

//  计算 overflow 指向的 bmap
func (b *bmap) overflow(t *maptype) *bmap {
	return *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize))
}

这里再强调一下 overflow 的计算方式,uintptr(t.bucketsize)-goarch.PtrSize 得到了溢出桶字段在 bmap 的偏移量(goarch.PtrSize 为机器上一个字节的大小,也是一个指针的大小),通过寻址的方式 add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize) 找到了 bmap.overflow 字段的地址,进而获取 bmap 指针。

5.map 删除

map 删除对应的函数为:mapdelete。如果删除发生在扩容过程中,需要先把定位到的 bucket 迁移完毕后才进行删除,顺便还要为渐进式迁移做出额外贡献:顺序迁移一个 bucket。

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := abi.FuncPCABIInternal(mapdelete)
		racewritepc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled && h != nil {
		msanread(key, t.key.size)
	}
	if asanenabled && h != nil {
		asanread(key, t.key.size)
	}

	// map 为 空,直接返回
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return
	}

	// 并发写检查
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}

	// 计算 key 的 hash 值
	hash := t.hasher(key, uintptr(h.hash0))

	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write (delete).
	// 设置写 map 标志
	h.flags ^= hashWriting

	// 定位 key 所在 bucket
	bucket := hash & bucketMask(h.B)
	// 正在扩容
	if h.growing() {
		// 迁移该 bucket 到新地址
		growWork(t, h, bucket)
	}
	// b 指向 key 所在 bucket
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
	// 记录原始的 b
	bOrig := b
	// 计算 tophash
	top := tophash(hash)
search:
	// 循环整个 bucket 链表
	for ; b != nil; b = b.overflow(t) {
		// 循环 8 个槽
		for i := uintptr(0); i < bucketCnt; i++ {
			// tophash 不匹配
			if b.tophash[i] != top {
				// emptyRest:bucket 后边数据都为空,不用继续循环了
				if b.tophash[i] == emptyRest {
					break search
				}
				// 直到找到匹配的 tophash
				continue
			}
			// 获取 key(处理 key 为指针的情况)
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			k2 := k
			if t.indirectkey() {
				k2 = *((*unsafe.Pointer)(k2))
			}
			// key 不相等,继续循环 bucket
			if !t.key.equal(key, k2) {
				continue
			}

			// 找到相等的 key
			// Only clear key if there are pointers in it.
            // 仅当其中有指针时才清除键
			if t.indirectkey() {
				// 当 key 存储为指针
				// 指针指向 nil,并不主动清理内存(由 GC 处理)
				*(*unsafe.Pointer)(k) = nil
			} else if t.key.ptrdata != 0 {
				// key 本身为指针
				// 清除指针内存
				memclrHasPointers(k, t.key.size)
			}

			// 获取 elem(value);同理 key 的清空过程
			e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			if t.indirectelem() {
				*(*unsafe.Pointer)(e) = nil
			} else if t.elem.ptrdata != 0 {
				memclrHasPointers(e, t.elem.size)
			} else {
				// 清除不包含指针的 elem 的内存
				memclrNoHeapPointers(e, t.elem.size)
			}
			// tophash 置为标志位 emptyOne,此槽为空
			b.tophash[i] = emptyOne
			// If the bucket now ends in a bunch of emptyOne states,
			// change those to emptyRest states.
			// 如果这个 bucket 后边以一堆 emptyOne 为结尾,则可以设置为 emptyRest
			// It would be nice to make this a separate function, but
			// for loops are not currently inlineable.
			if i == bucketCnt-1 {
				// key 为 bucket 中最后一个槽
				// 溢出桶不为空 && 溢出桶的第一个标志位不为 emptyRest
				if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
					// 不是最后一个元素,后边还有数据,直接跳过 for 循环
					goto notLast
				}
			} else {
				// 当 key 不为 bucket 最后一个槽时,直接检查后一个槽是否为 emptyRest
				// 即可判断后边是否还有数据
				if b.tophash[i+1] != emptyRest {
					goto notLast
				}
			}

			// 执行到这里的时候,表明刚刚删除的 key 是 bucket 或其溢出桶中的最后一个有数据的元素,后续都没数据了
			// 倒着往前数,把 emptyOne 标志位都置为 emptyRest,直到有数据或链表到头
			// emptyRest 标志可以优化遍历 map 的性能
			for {
				// 将当前 b.tophash[i] 设置为 emptyRest,标志后边没有元素了
				// 循环往前设置 emptyRest,直到槽内有数据或链表到头
				b.tophash[i] = emptyRest
				// 从 i 开始往前遍历,直到 i == 0
				// 当遍历到 i == 0 时,分两种情况
				// 1. 链表到头了
				// 2. 需要找到链表的前一个,继续循环遍历,然后赋值 emptyRest
				if i == 0 {
					// 如果 b 是链表最原始的头(也就是普通桶)
					if b == bOrig {
						// 找到初始桶,就意味着链表到头了,可以结束循环了
						break // beginning of initial bucket, we're done.
					}
					// Find previous bucket, continue at its last entry.
					// 找到当前 bucket 的前一个 bmap
					c := b
					for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
					}
					// 继续遍历前一个桶的槽
					i = bucketCnt - 1
				} else {
					i--
				}
				// 如果当前 i 标志位不为 emptyOne,说明槽内有数据,则终止循环
				if b.tophash[i] != emptyOne {
					break
				}
			}
		notLast:
			// 由于删除了元素,因此 count--
			h.count--
			// Reset the hash seed to make it more difficult for attackers to
			// repeatedly trigger hash collisions. See issue 25237.
			// 如果 map 内元素清零
			if h.count == 0 {
				// 重置哈希种子,使攻击者更难重复触发哈希冲突。 请参阅问题 25237。
				h.hash0 = fastrand()
			}
			// 已经删除了对应的 value,退出循环
			break search
		}
	}

	// 写冲突判断
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	// 消除写标志
	h.flags &^= hashWriting
}

map 删除逻辑有几个非常优秀的设计值得一提:

  1. 当 key、value 被编译器优化为以指针形式存储的时候(indirectkey、indirectelem),源码只是将对应槽中的区域指针指向了 nil,并没有主动触发清理内存,节省了时间,最终由 GC 回收。
  2. 当 key 不是指针时,源码中并没有对 key 的内存进行清理,但 elem 却被清理了,是不是有一点懵啊!原因这里解释一下:首先没有指针,不清理内存不妨碍 GC;被删除的 key 的位置被插入新 key 源码会使用 typedmemmove(t.key, insertk, key) 对其内存进行覆盖,所以不清理内存也是 ok 的。但为啥又把 elem 的内存给清了呢?这里我们留一个疑问,等看完下一小节,我们再来揭晓。
  3. emptyRest 状态的设置也是对性能的一种优化,通过两种标志位 emptyOne、emptyRest 和倒循环写法,在删除元素时,完成了对桶中元素的标记,最终提升了 map 遍历性能,emptyRest 在读取、修改、插入、删除等等,都有实际的应用;虽然迭代器部分没有使用,但后续官方也有优化的意向,写了 todo 事项,真的为官方这种优化精神而感动。

6.map 修改和写入

map 修改和写入共用一个函数:mapassign。同样,修改和写入如果发生在扩容期间,也会触发 bucket 迁移,最多两个 bucket 进行迁移,也是等 bucket 迁移完毕,对新 bucket 进行修改和写入;另外写入 key 还有可能会额外触发扩容操作,一旦发生扩容,key 需要重新进行计算和写入。

6.1 mapassign 修改和写入 map

// 功能:插入 key 或者修改 map 中的 key
// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	// map 为 nil,触发 panic
	if h == nil {
		panic(plainError("assignment to entry in nil map"))
	}
	if raceenabled {
		callerpc := getcallerpc()
		pc := abi.FuncPCABIInternal(mapassign)
		racewritepc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled {
		msanread(key, t.key.size)
	}
	if asanenabled {
		asanread(key, t.key.size)
	}

	// 并发写,panic
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}

	// 计算 key 的哈希值
	hash := t.hasher(key, uintptr(h.hash0))

	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write.
	// 标记正在写 map(map 并发读写、并发写不安全)
	h.flags ^= hashWriting

	// 如果 buckets 为 nil,则新建一个
	if h.buckets == nil {
		h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
	}

again:
	// 获取 bucket 下标(定位 key 在哪个 bucket)
	bucket := hash & bucketMask(h.B)
	// 正在扩容
	if h.growing() {
		// 迁移该 bucket
		growWork(t, h, bucket)
	}
	// b 为 key 所属的 bucket
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
	// 计算 tophash  高 8 位
	top := tophash(hash)

	var inserti *uint8 // 记录要插入的 tophash 地址
	var insertk unsafe.Pointer // 记录要插入的 key 地址
	var elem unsafe.Pointer // 记录要插入的 value(elem) 地址
bucketloop:
	for {
		// 循环 bucket 中的 8 个槽
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				// tophash 不匹配
				// 槽为空 && 还没记录要插入的位置
				if isEmpty(b.tophash[i]) && inserti == nil {
					// 记录要插入的位置 tophash、key、value
					inserti = &b.tophash[i]
					insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
					elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				}
				// 如果 tophash 为标志位 emptyRest,意味着 bucket 没有找到对应的 key,同时 bucket 中剩余的槽都是空的。
				// 后边就没必要循环了
				if b.tophash[i] == emptyRest {
					// 终止对 bucket 的循环
					break bucketloop
				}
				// 没找到对应的 tophash,就继续找
				continue
			}
			// 走到这里,意味着找到对应的 tophash 了
			// 获取 key
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			// key 是否包含指针
			if t.indirectkey() {
				// 解引用
				k = *((*unsafe.Pointer)(k))
			}
			// key 是否相等(寻找 key 的最终判断条件)
			if !t.key.equal(key, k) {
				continue
			}
			// already have a mapping for key. Update it.
			// 已存在该 key,则更新值
			// 看看 key 是否需要被覆盖
			if t.needkeyupdate() {
				typedmemmove(t.key, k, key)
			}
			// 获取 value 的地址,最终返回 elem 的地址,在函数外按地址对值进行更新,这样就不用把新值传进来了
			elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			// 找到 key 和 value,后续就可以结束了
			goto done
		}
		// 获取下一个溢出桶地址
		ovf := b.overflow(t)
		// 溢出桶到头了
		if ovf == nil {
			// 跳出 bucket 循环
			break
		}
		// 去下一个溢出桶继续找 key
		b = ovf
	}

	// Did not find mapping for key. Allocate new cell & add entry.
	// 没找到 key,说明要插入新 key,需要给 key 分配新槽

	// If we hit the max load factor or we have too many overflow buckets,
	// and we're not already in the middle of growing, start growing.
	// 非扩容过程中 && (超过负载因子 || 有太多溢出桶) -> 触发扩容
	if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
		// 哈希表扩容
		hashGrow(t, h)
		// 扩容后上述所做工作全部无效,需要重新再来一遍
		goto again // Growing the table invalidates everything, so try again
	}

	// 没找到可插入的地方
	if inserti == nil {
		// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
		// 新建一个溢出桶
		newb := h.newoverflow(t, b)
		// 记录要插入的位置
		inserti = &newb.tophash[0]
		insertk = add(unsafe.Pointer(newb), dataOffset)
		elem = add(insertk, bucketCnt*uintptr(t.keysize))
	}

	// store new key/elem at insert position
	// key 是否需要存储指针(key 太大时,需要存储指针类型)
	if t.indirectkey() {
		kmem := newobject(t.key) // 给新 key 分配内存(只是分配了地址,还没有写入 key)
		*(*unsafe.Pointer)(insertk) = kmem // 槽中的 key 值区域保存了新 key 应在的内存指针
		insertk = kmem // insertk 指向新分配的地址,不再指向槽中的 key 值区
	}
	// value 同理 key 操作
	if t.indirectelem() {
		vmem := newobject(t.elem)
		*(*unsafe.Pointer)(elem) = vmem
	}
	// 移动 key 到 insertK 指向的地址
	// 最终槽中的 key 值区域保存了新 key 应在的内存指针,该指针指向了新 key 值
	// 如果不需要 indirectkey,则槽中的 key 值区域直接保存 key 值
	typedmemmove(t.key, insertk, key)
	*inserti = top // 保存 tophash 值
	h.count++ // map 元素个数 +1

done:
	// 如果写冲突,panic
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	// 清除写标志
	h.flags &^= hashWriting

	// 如果 elem 保存的是指向 elem 的指针
	if t.indirectelem() {
		// 获取指向 elem 实际存储地址的指针(由于返回 elem 用于修改 value,所以需要操作 value 的地址)
		elem = *((*unsafe.Pointer)(elem))
	}
	// 返回存储值的指针
	return elem
}

还记得上一小节留下了一个小小的疑问吗?当 key、elem 都不是指针时,删除 key 时只清理了 elem 的内存,却保留了 key 的内存,虽然这么写代码是合理的,但优点是啥呢?

我们看了插入源码可以发现,插入 key 时使用了 typedmemmove(t.key, insertk, key) 函数对 key 的内存进行了覆盖,这样删除 key 时不清理内存逻辑是合理的,其实也是节省了清理内存的性能的,但插入的时候性能不就有损了吗?我们继续看一下 typedmemmove 函数的代码,相信你就理解了。

func typedmemmove(typ *_type, dst, src unsafe.Pointer) {
    // 当内存覆盖的源和目的一样,直接就 return 了,否则才进行内存覆盖
	if dst == src {
		return
	}
	if writeBarrier.needed && typ.ptrdata != 0 {
		bulkBarrierPreWrite(uintptr(dst), uintptr(src), typ.ptrdata)
	}
	// There's a race here: if some other goroutine can write to
	// src, it may change some pointer in src after we've
	// performed the write barrier but before we perform the
	// memory copy. This safe because the write performed by that
	// other goroutine must also be accompanied by a write
	// barrier, so at worst we've unnecessarily greyed the old
	// pointer that was in src.
	memmove(dst, src, typ.size)
	if writeBarrier.cgo {
		cgoCheckMemmove(typ, dst, src, 0, typ.size)
	}
}

到这里,相信你已经明白为啥唯独不处理 key 了,原因就是被删除的 key 重新被写回 map 的概率很高,同时这个 key 重新再被插到同一个槽的概率也很高,当重新被写回来时,就不用处理内存了,这样可以极大地提升性能;而一个被删除的 key ,再次被插进来时,value 还是同一个 value 的概率可太小了(相信各位写业务代码的时候都能感受到),因此还不如直接把 value 清理了,这样代码的可读性会更好,更符合我们编码的习惯,通过阅读源码可以发现,golang 源码的工程师是把性能优化考虑到了极致,希望你阅读本篇文章可以感受到这一点。

6.2 newoverflow 新建溢出桶

// 新建一个溢出桶
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
	var ovf *bmap
	// 优先使用创建 map 时预分配的溢出桶 h.extra.nextOverflow
	if h.extra != nil && h.extra.nextOverflow != nil {
		// We have preallocated overflow buckets available.
		// See makeBucketArray for more details.
		ovf = h.extra.nextOverflow
		if ovf.overflow(t) == nil {
			// 预分配的溢出桶中指向下一个溢出桶的字段为 nil,意味着还没有用完预分配的溢出桶(不懂的看前面分析)
			// We're not at the end of the preallocated overflow buckets. Bump the pointer.
			// h.extra.nextOverflow 指向下一个可用的溢出桶
			h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
		} else {
			// This is the last preallocated overflow bucket.
			// Reset the overflow pointer on this bucket,
			// which was set to a non-nil sentinel value.
			// 已经用到最后一个预分配的溢出桶,设置其内部指向下一个溢出桶的字段为 nil(因为要用,需要恢复初始值)
			ovf.setoverflow(t, nil)
			// 这里表示预分配的溢出桶用完了
			h.extra.nextOverflow = nil
		}
	} else {
		// 没有预分配的溢出桶,只能重新建一个
		ovf = (*bmap)(newobject(t.bucket))
	}
	// h.noverflow 溢出桶数量增加,记录溢出桶数量
	// 溢出桶数量小时,是精确值,否则为大概计数,源码很简单,有兴趣可以看一下
	h.incrnoverflow()
	// map 中 key,value 不含指针
	if t.bucket.ptrdata == 0 {
		// 创建 overflow 切片
		h.createOverflow()
		// 切片指向所有溢出桶(保活使用,防止被 GC)
		*h.extra.overflow = append(*h.extra.overflow, ovf)
	}
	// 给当前 bucket 的 overflow 赋值指向溢出桶
	b.setoverflow(t, ovf)
	return ovf
}

当 bucket 8个槽都存满的情况下,map 会新建溢出桶,此时会优先使用 makeBucketArray 函数创建的空闲溢出桶,还记得存在哪里吗?不记得的话,可以再去看一下第 3 小节。空闲溢出桶存储在 h.extra.nextOverflow 字段中,如果空闲溢出桶不为 nil,说明达到最后一个空闲溢出桶,将 h.extra.nextOverflow 置为 nil 即可标志用完了。

同时,在创建好新的溢出桶的时候,h.extra.overflow的保活机制就开始干活了,不了解的可以回头看一下第 1 小节的 extra 字段的作用讲解部分。

7.map 扩容

使用哈希表的目的就是要快速查找到目标 key,然而,随着向 map 中添加的 key 越来越多,key 发生碰撞的概率也越来越大;bucket 中的 8 个槽会被逐渐塞满,通过前面的源码分析,我们也知道,查找、插入、删除 key 的效率也会越来越低。

最理想的情况是一个 bucket 只装一个 key,这样就能达到 O(1) 的效率,但这样空间消耗太大,用空间换时间的代价太高。而 Go 对 map 的设计中,一个 bucket 能装载 8 个 key,定位到 bucket 后,还需要再定位到具体的 key,其实是一种时间换空间的操作。超过 8 个的 key 冲突,以链表的形式解决,当然这也要有度,不然一个 bucket 存储太多的 key,性能会直接退化为链表,操作效率变为 O(n)。

因此,需要一个指标来衡量 bucket 整体的负载情况,也就是“装载因子”。装载因子计算公式:loadFactor := count / (2^B);count:map 中元素个数;2^B 表示 bucket 数量。

7.1 扩容触发条件

扩容触发的时机:map 插入新 key 时,会检测触发条件,满足条件则会触发扩容。 在 mapassign 函数中触发条件是这样写的:

// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
// 非扩容过程中 && (超过负载因子 || 有太多溢出桶) -> 触发扩容
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
    // 哈希表扩容
    hashGrow(t, h)
    // 扩容后上述所做工作全部无效,需要重新再来一遍
    goto again // Growing the table invalidates everything, so try again
}

// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
func overLoadFactor(count int, B uint8) bool {
	// loadFactorNum = 13;loadFactorDen = 2
    // bucketShift(B) = 2^B
	return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

// tooManyOverflowBuckets reports whether noverflow buckets is too many for a map with 1<<B buckets.
// Note that most of these overflow buckets must be in sparse use;
// if use was dense, then we'd have already triggered regular map growth.
// 判断是否有太多的溢出桶了
// 多的标准是:
// B <= 15 的时候,溢出桶数量大于 2^B 的时候
// B > 15 的时候,溢出桶的数量大于 2^15 的时候
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
	// If the threshold is too low, we do extraneous work.
	// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
	// "too many" means (approximately) as many overflow buckets as regular buckets.
	// See incrnoverflow for more details.
	if B > 15 {
		B = 15
	}
	// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
	return noverflow >= uint16(1)<<(B&15)
}

扩容触发条件总结为两点:

  1. 装载因子超过阈值,源码里定义的阈值是 6.5
    1. 如果每个 bucket 都没有溢出,并且 8 个槽都装满的情况下,装载因子 = 8,6.5 也就意味着很多 bucket 都快装满了,查找和插入效率都会变低,这个时候扩容是很有必要的。至于装载因子为啥是 6.5,官方注释给的解释是应该实验得出的,实验数据在源码也能找到,这里就不展示了。
    2. 针对元素过多的这种情况,Go 给的扩容方式为 2 倍扩容,即 B + 1,bucket 由原来的 2 ^ B 变为 2 ^ B * 2 = 2^(B+1)。
  2. 溢出桶的数量过多
    1. 当装载因子较小的时候,有时候会发现 map 的查找和插入效率也很低,原因是溢出桶太多了,但由于 map 中元素少,触发不了第一种条件,但查询效率变低;这种是由于大量的插入和删除元素导致的:比如先插入了大量的元素,创建了很多的 overflow,然后删除后,继续插入,触发不了第一种扩容机制,但 key 会因为大量 overflow 的存在变得很分散,导致查询效率变低。
    2. 针对这种元素不多,overflow 过多的情况,bucket 并没有装满,只是太分散了,解决办法就是开辟一个新 bucket 空间(与原空间等量为 2 ^ B),将老 bucket 中的元素移动到新 bucket,使得同一个 bucket 中的 key 排列地更紧密。这样,原来,在 overflow bucket 中的 key 可以移动到 bucket 中来。结果是节省空间,提高 bucket 利用率,map 的查找和插入效率自然就会提升。

7.2 扩容源码解析

由于 map 扩容需要将原有的 key/value 重新搬迁到新的内存地址,如果有大量的 key/value 需要搬迁,会非常影响性能。因此 Go map 的扩容采取了一种称为“渐进式”的方式,原有的 key 并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket。 因此,map 扩容分为两个阶段:

  1. hashGrow函数:分配新的内存空间,用于存储新的 buckets,并将老的 buckets 挂到 oldbuckets 字段上。
  2. growWork函数:搬迁旧 bucket 到新 bucket 中。触发搬迁的地方在 mapassign 和 mapdelete 函数中,也就是在插入、修改或删除 key 的操作里。

7.2.1 hashGrow 函数

func hashGrow(t *maptype, h *hmap) {
	// If we've hit the load factor, get bigger.
	// Otherwise, there are too many overflow buckets,
	// so keep the same number of buckets and "grow" laterally.
	// 扩容分为等量和2倍量
	bigger := uint8(1)
	if !overLoadFactor(h.count+1, h.B) {
		// 未达到装载因子,等量扩容
		bigger = 0
		// 标志位置为等量扩容
		h.flags |= sameSizeGrow
	}
	// h.buckets 挂载到 oldbuckets 字段
	oldbuckets := h.buckets
	// 分配新的 bucket 数组,和预分配新的溢出桶
	newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

	// flags 先把迭代器标志置为 0,用于后续记录迭代器在使用新的还是旧的 bucket
	flags := h.flags &^ (iterator | oldIterator)
	if h.flags&iterator != 0 {
		// 当前 hmap 在迭代器中
		// flags 迭代器标志位置为在使用旧 bucket,因为在扩容前发生的
		flags |= oldIterator
	}

	// commit the grow (atomic wrt gc)
	// 提交扩容操作
	h.B += bigger // 等量或2倍量扩容
	h.flags = flags // 迭代器使用新旧 bucket 标志位确认(迭代器初始化会把新旧标志位都置为 1)
	h.oldbuckets = oldbuckets // 挂载旧 buckets
	h.buckets = newbuckets // 挂载新 buckets
	h.nevacuate = 0 // 初始化迁移进度
	h.noverflow = 0 // 新 buckets 溢出桶数量为 0

	// h.extra.overflow != nil 意味着 k-v 都不是指针,溢出桶存储在 overflow 字段保活
	if h.extra != nil && h.extra.overflow != nil {
		// Promote current overflow buckets to the old generation.
		if h.extra.oldoverflow != nil {
			throw("oldoverflow is not nil")
		}
		// 旧溢出桶存储到 oldoverflow
		h.extra.oldoverflow = h.extra.overflow
		// 新溢出桶个数为 0 ,置空即可
		h.extra.overflow = nil
	}
	if nextOverflow != nil {
		if h.extra == nil {
			h.extra = new(mapextra)
		}
		// 指向预分配的新空白溢出桶
		h.extra.nextOverflow = nextOverflow
	}

	// the actual copying of the hash table data is done incrementally
	// by growWork() and evacuate().
	// 哈希表数据的实际迁移过程是通过 growWork() 和 evacuate() 增量完成的。
}

hashGrow 函数逻辑比较简单,其中值得一提是对 h.flags 的处理,这里其实是和下一小节迭代器有关系,我这里把代码放到一起看,你会更明白:

func hashGrow(t *maptype, h *hmap) {
    // flags 先把迭代器标志置为 0,用于后续记录迭代器在使用新的还是旧的 bucket
    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        // 当前 hmap 在迭代中
        // flags 迭代器标志位置为在使用旧 bucket,因为在扩容前发生的
        flags |= oldIterator
    }
    
    h.flags = flags // 迭代器使用新旧 bucket 标志位确认(迭代器初始化会把新旧标志位都置为 1)
}

// 迭代器初始化函数
func mapiterinit(t *maptype, h *hmap, it *hiter) {
    // Remember we have an iterator.
	// Can run concurrently with another mapiterinit().
	// 记住我们有一个迭代器。
	// 可以与另一个 mapiteinit() 同时运行。
	// 设置 hmap 迭代器标志位为 iterator|oldIterator
	// 1. 迭代前未进行扩容,迭代器初始化为 11,迭代过程中发送扩容,标志位被更改为 10
	// 表示迭代器正在使用旧 hmap.oldbuckets,迁移完毕不能其清理内存,标志新 bucket 没有被迭代器使用
	// 2. 迭代器初始化时正在经历扩容,标志为 11,表示 hmap.buckets 正在被迭代器使用
	// 此时,迭代器也会访问 hmap.oldbuckets 数据,因此也不能进行清理
	if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
		atomic.Or8(&h.flags, iterator|oldIterator)
	}
}

迭代器初始化函数 mapiterinit 初始化迭代器时会把 h.flags 字段中 iterator 和 oldIterator 对应位都置为 1,因为迭代器使用旧 buckets 和 新 buckets 的可能性都是存在的,有可能在迭代器使用过程中发生扩容,此时可以确定的认为该迭代器的标志位为 oldIterator,在 hashGrow 函数中,将 h.flags 字段置为 oldIterator。

7.2.2 growWork 函数

mapassign 函数中触发为例,会执行两个函数:growing 和 growWork,代码如下:

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    // 正在扩容
	if h.growing() {
		// 迁移该 bucket
		growWork(t, h, bucket)
	}
}

// growing reports whether h is growing. The growth may be to the same size or bigger.
func (h *hmap) growing() bool {
    // 当 h.oldbuckets != nil 时,说明正在扩容中
	return h.oldbuckets != nil
}

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// make sure we evacuate the oldbucket corresponding
	// to the bucket we're about to use
    // 把即将使用的旧存储桶迁移到新桶
	evacuate(t, h, bucket&h.oldbucketmask())

	// evacuate one more oldbucket to make progress on growing
    // 仍然处于扩容中
	if h.growing() {
        // 按顺序触发迁移
		evacuate(t, h, h.nevacuate)
	}
}

bucket&h.oldbucketmask() 这行代码,如源码注释里说的,是为了确认搬迁的 bucket 是我们正在使用的 bucket。oldbucketmask() 函数返回扩容前的 map 的 bucketmask。bucket&h.oldbucketmask() = oldbucket 旧存储桶的下标。我们发现 growWork 函数会最多进行两次 bucket 迁移,每次迁移一个 bucket,如果定位到的 bucket 已经迁移完毕,或者刚好定位到的 bucket 与顺序迁移的 bucket 一致,则此过程只迁移一个 bucket,否则迁移两个 bucket。

  1. 顺序迁移,由标志位 nevacuate 决定旧 buckets 数组的下标。
  2. 写操作导致的迁移, bucket&h.oldbucketmask() 用于计算旧 buckets 数组的下标。

接下来继续分析源码 evacuate 函数,逻辑有点复杂,这里理解的关键点在于 2 倍扩容期间的迁移操作。由于扩容是 2 倍的,也就意味着一个 bucket 会裂变为两个 bucket,那 oldbucket 里的 key 该如何分配呢?其实也很简单,buckets 数组由原来的 2^B 裂变为 2^(B+1) 个;那定位 key 时,就该计算低 B+1 位,来定位新 buckets 数组的下标了。

其实就是多了一个二进制位,我们都知道一个二进制位代表了两种情况 0 或 1。假设 B = 5,原低 B 位为 00110,也就是定位到 buckets[6];如今 2 倍扩容后,该 key 可能的情况有 000110 = 6100110 = 38,裂变为了两个 bucket 下标,其中一个和迁移前一致;一致的部分被称之为低位部分:x 部分;不一致的部分被称之为高位部分:y 部分;后续迁移以及迭代器部分都要使用 xy,这里需要记一下。 接下来,我们逐句分析一下源码:

// 迁移目的地
type evacDst struct {
   b *bmap          // 迁移目的地 bucket
   i int            // 槽数组的下标
   k unsafe.Pointer // 当前槽 key 的地址指针
   e unsafe.Pointer // 当前槽 elem 的地址指针
}

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
	// 定位老的 bucket 地址
	b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
	// newbit = 2^B,该变量的意思是新 bit 位,如果是二倍扩容,旧 bucket 会发生裂变,产生新的 bit 位
	newbit := h.noldbuckets()
	// 如果 b 尚未进行迁移
	if !evacuated(b) {
		// TODO: reuse overflow buckets instead of using new ones, if there
		// is no iterator using the old buckets.  (If !oldIterator.)

		// xy contains the x and y (low and high) evacuation destinations.
		// xy 变量包含 x 和 y(低和高)迁移目的地。(2倍迁移,bucket 下标的前一位多了 0 或 1,分为 x 和 y)
		var xy [2]evacDst
		x := &xy[0]
		// x 表示裂变的前半部分,在新桶数组的下标与旧桶数组下标一致
		// 存储新桶要迁移的地址
		x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
		x.k = add(unsafe.Pointer(x.b), dataOffset)
		x.e = add(x.k, bucketCnt*uintptr(t.keysize))

		// 非等量扩容,则为2倍扩容
		if !h.sameSizeGrow() {
			// Only calculate y pointers if we're growing bigger.
			// Otherwise GC can see bad pointers.
			// 2倍扩容存在 y 区域,也就是新 bit 位 = 1
			// y 中存储新桶裂变的后半部分迁移地址
			y := &xy[1]
			// 2 倍裂变的 y 部分下标 = oldbucket+newbit
			// 例如:oldbucket = 111 三位,newbit = 1000 四位;oldbucket+newbit = 1111 新下标
			y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
			y.k = add(unsafe.Pointer(y.b), dataOffset)
			y.e = add(y.k, bucketCnt*uintptr(t.keysize))
		}

		// 循环当前迁移旧 bucket 的整个链表结构
		for ; b != nil; b = b.overflow(t) {
			k := add(unsafe.Pointer(b), dataOffset) // 取出第一个 key
			e := add(k, bucketCnt*uintptr(t.keysize)) // 取出第一个 value
			// 循环 8 个槽
			for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
				// 获取 tophash
				top := b.tophash[i]
				// 槽上数据为空
				if isEmpty(top) {
					// 标记已迁移,且 cell 为空
					b.tophash[i] = evacuatedEmpty
					// 处理下一个槽
					continue
				}
				// top 不能为已迁移标志,否则有问题,相当于已经迁移的又迁移一遍
				if top < minTopHash {
					throw("bad map state")
				}
				// 获取 key 真正的值
				k2 := k
				if t.indirectkey() {
					k2 = *((*unsafe.Pointer)(k2))
				}
				var useY uint8 // 数据迁移的裂变去向 0:表示x;1:表示 y
				// 非等量扩容
				if !h.sameSizeGrow() {
					// Compute hash to make our evacuation decision (whether we need
					// to send this key/elem to bucket x or bucket y).
					// 计算 key 的哈希值,判断 key 的去向,是 x 还是 y 部分(等量扩容,直接迁移去 x 部分)
					hash := t.hasher(k2, uintptr(h.hash0))
					// 存在迭代器 && key != key &&  key !equal key
					if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
						// If key != key (NaNs), then the hash could be (and probably
						// will be) entirely different from the old hash. Moreover,
						// it isn't reproducible. Reproducibility is required in the
						// presence of iterators, as our evacuation decision must
						// match whatever decision the iterator made.
						// Fortunately, we have the freedom to send these keys either
						// way. Also, tophash is meaningless for these kinds of keys.
						// We let the low bit of tophash drive the evacuation decision.
						// We recompute a new random tophash for the next level so
						// these keys will get evenly distributed across all buckets
						// after multiple grows.
						// 如果 key != key(不是一个数字),key 每次的哈希值都不一样,它是不可再现的。
						// 在迭代器存在的情况下,可重复性是必需的,因为我们的迁移决策必须与迭代器所做的任何决策相匹配,
						// 迭代器还要利用迁移后的数据进行遍历 map。
						// 幸运的是,我们可以任意发送这些 key,因为这些值,靠 key 计算哈希是访问不到的,只有迭代器可以访问。
						// 此外,tophash 对于这些类型的 key 没有意义。我们让 tophash 的最低位的决定如何迁移。
						// 我们为下一个级别重新计算一个新的随机 tophash,这样在多次扩容后,这些 key 将均匀分布在所有桶中。
						useY = top & 1
						top = tophash(hash)
					} else {
						// key 可计算 hash,且幂等的情况下,使用 hash 的新 bit 位,决定迁移数据的去向
						if hash&newbit != 0 {
							// hash新 bit 位为 1,应该去往 y 部分,否则去往 x 部分
							useY = 1
						}
					}
				}

				if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
					throw("bad evacuatedN")
				}

				// tophash 标志位设置为 evacuatedX + useY (迁移去了 x 还是 y 部分)
				b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
				// 使用新 bucket 的目的地
				dst := &xy[useY]                 // evacuation destination

				// 目标 bucket 装不下了,使用溢出桶
				if dst.i == bucketCnt {
					// 创建一个溢出桶,更新 dst 迁移目的地
					dst.b = h.newoverflow(t, dst.b)
					dst.i = 0
					dst.k = add(unsafe.Pointer(dst.b), dataOffset)
					dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
				}
				// 记录 tophash (dst.i&(bucketCnt-1) 避免边界检查)
				dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
				// key 优化为指针存储
				if t.indirectkey() {
					// 修改 bucket 槽中指向 key 值的指针
					*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
				} else {
					// 修改 bucket 槽中保存 key 值的内存
					typedmemmove(t.key, dst.k, k) // copy elem
				}
				// value 优化为指针存储
				if t.indirectelem() {
					// 修改 bucket 槽中指向 value 值的指针
					*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
				} else {
					// 修改 bucket 槽中保存 value 值的内存
					typedmemmove(t.elem, dst.e, e)
				}
				// 通过改变 dst 内部指针的方式,改变了 &xy[useY] 指向下一个槽,继续进行迁移
				dst.i++
				// These updates might push these pointers past the end of the
				// key or elem arrays.  That's ok, as we have the overflow pointer
				// at the end of the bucket to protect against pointing past the
				// end of the bucket.
				// 这些更新可能会将这些指针推过 key 或 elem 数组的末尾。
				// 没关系,因为我们在桶的末端有溢出指针,以防止指针超出桶的末端。
				// 到达溢出桶,会重新改变 dst 内部指针的指向
				dst.k = add(dst.k, uintptr(t.keysize))
				dst.e = add(dst.e, uintptr(t.elemsize))
			}
		}
		// Unlink the overflow buckets & clear key/elem to help GC.
		// 该 bucket 迁移完成,看是否需要清理内存
		// 取消链接溢出桶并清除 key/elem 以帮助 GC。(清除旧桶的指针内存)
		// hmap 不在迭代器中(迭代器没有使用旧 buckets) && key、value 包含指针  -> 此时应该清理内存
		if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
			// 定位旧 bucket 地址
			b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
			// Preserve b.tophash because the evacuation
			// state is maintained there.
			// 不能清除 tophash 的内存,因为有标志位
			// dataOffset 表示 tophash 的内存大小
			ptr := add(b, dataOffset)
			n := uintptr(t.bucketsize) - dataOffset
			// memclrHasPointers 清除从 ptr 开始的 n 个字节的类型化内存。
			memclrHasPointers(ptr, n)
		}
	}

	// 迁移的桶为顺序迁移的位置,需要更新 nevacuate 字段,表示已经迁移多少桶
	// 表示该下标前的桶都迁移完毕了
	if oldbucket == h.nevacuate {
		advanceEvacuationMark(h, t, newbit)
	}
}

advanceEvacuationMark 函数用于记录顺序迁移的进度,小于 nevacuate 下标的 bucket 表示已经完成迁移。当全部迁移完成时,会把 old 相关字段置空,但旧桶内存并没有被清理,只是表示停止指向 oldbuckets 的相关内存,如果此时存在迭代器,需要迭代器自己保活 oldbuckets 相关内存,否则会被 GC 回收。

// 记录迁移进度
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
	h.nevacuate++

	// Experiments suggest that 1024 is overkill by at least an order of magnitude.
	// Put it in there as a safeguard anyway, to ensure O(1) behavior.
	// 实验表明 1024 至少大了一个数量级。
	// 把它放在那里作为保障,以确保 O(1) 行为。

	// 往后找一个未迁移的桶
	// 桶分两种情况进行迁移
	// 1. 顺序迁移,2. 当前正在插入、修改、删除的 bucket 进行迁移
	// 所以 h.nevacuate + 1 可能已经被迁移,所以需要一个搜索范围,控制最坏情况,也就是 1024
	// 从 h.nevacuate + 1 最多遍历 1024
	stop := h.nevacuate + 1024
	// 如果 stop > 桶最大数量下标 :说明桶都遍历到最后了,还没到 stop 是不合理的,因此最大值为 newbit
	if stop > newbit {
		stop = newbit
	}
	// 还未遍历到未迁移的桶
	for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
		// 已迁移标志 ++
		h.nevacuate++
	}
	// 遍历到最后,也没有找到未迁移的桶,表示迁移完毕了
	if h.nevacuate == newbit { // newbit == # of oldbuckets
		// Growing is all done. Free old main bucket array.
		// 释放 oldbuckets 字段(释放旧桶数组)
		h.oldbuckets = nil
		// Can discard old overflow buckets as well.
		// If they are still referenced by an iterator,
		// then the iterator holds a pointers to the slice.
		// 也可以丢弃旧的溢出桶。
		// 如果它们仍然被迭代器引用,那么迭代器将保存指向切片的指针,用于保活,防止被 GC。
		//(但是 h 不再需要保存指向切片的指针)
		if h.extra != nil {
			h.extra.oldoverflow = nil
		}
		// 新 map 清除等量扩容标志
		h.flags &^= sameSizeGrow
	}
}

8.map 迭代器

从编译器源码 gofrontend/go/statements.cc/For_range_statement::do_lower() 中,可以看到 for...range 对 map 有以下注释:

// The loop we generate:
// var hiter map_iteration_struct
// for mapiterinit(type, range, &hiter); hiter.key != nil; mapiternext(&hiter) {
//     index_temp = *hiter.key
//     value_temp = *hiter.val
//     index = index_temp
//     value = value_temp
//     original body
// }

注释源码分析:

  1. 初始化迭代器使用 mapiterinit(type, range, &hiter);hiter 为迭代器结构体。
  2. hiter.key != nil 为 for range 的终止条件,由 mapiternext 函数内部设置。
  3. 遍历 map 时没有指定循环次数,每一次迭代需要调用 mapiternext 函数获取新值,每调用一次 mapiternext,hiter.key 和 hiter.val 就会变化为下一个 map 元素,给外部 for range 使用。

8.1 mapiterinit 迭代器初始化

看源码之前,需要先理解一下迭代器 hiter 和 hmap 之间的关系,hiter 在初始化的时候不仅关联了 hmap,例如 hiter.h 字段,可以获取 hmap 的最新值;hiter 还记录 hmap 的快照数据(快照即为定格),迭代器是按照快照指向的 buckets 数组来进行遍历的。

// 哈希迭代结构。
type hiter struct {
   key unsafe.Pointer // 必须排在第一位。 写入 nil 表示迭代结束
   elem unsafe.Pointer // 必须在第二个位置(参见 cmd/compile/internal/walk/range.go)。
   t    *maptype       // map 的类型信息,包括 key、elem 的类型等信息
   h    *hmap          // 需要迭代的 hmap
    
   // hiter 初始化时的 bucket 指针(迭代器初始化时,设置的 map 当前最新的 buckets = hmap.buckets)
   buckets unsafe.Pointer 
   bptr *bmap // 当前正在遍历的 bucket
   
   overflow *[]*bmap // 保持 hmap.buckets 的溢出桶存活
   oldoverflow *[]*bmap // 保持 hmap.oldbuckets 的溢出桶存活
   
   startBucket uintptr // bucket 迭代开始位置(随机选择的 bucket)
   
   offset uint8 // 在迭代期间开始的桶内偏移量(应该足够大以容纳 bucketCnt-1)
    
   // 已经从桶数组的末尾环绕到开始(比如从中间 bucket 开始遍历,
   // 经过最大 bucket 时,从 0 下标 bucket 重新开始,此时该参数为 true)
   wrapped     bool
    
   B           uint8   // 就是当前遍历的 hmap 的那个 B
   i           uint8   // 当前遍历的 bucket 内 key 的索引
   bucket      uintptr // 当前遍历的 bucket
   checkBucket uintptr // 是否需要检查 bucket 迁移裂变的位置
}

func mapiterinit(t *maptype, h *hmap, it *hiter) {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiterinit))
	}

	// 记录 map 的类型信息
	it.t = t
	if h == nil || h.count == 0 {
		return
	}

	if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {
		throw("hash_iter size incorrect") // see cmd/compile/internal/reflectdata/reflect.go
	}
	// 关联上 map
	it.h = h

	// grab snapshot of bucket state
	// 抓取桶状态快照,记录迭代器初始化的时候桶状态 (记住 it 是 hmap 的快照)
	it.B = h.B
	it.buckets = h.buckets
    // bucket 中存储的内容不包含指针
	if t.bucket.ptrdata == 0 {
		// Allocate the current slice and remember pointers to both current and old.
		// This preserves all relevant overflow buckets alive even if
		// the table grows and/or overflow buckets are added to the table
		// while we are iterating.

        // 分配当前切片并记住指向当前切片和旧切片的指针。
		// 即使在迭代时表增长或溢出桶添加到表中,这也会保留所有相关的溢出桶。
        // 迭代器给溢出桶保活,迭代器迭代期间,溢出桶不能被 GC
		h.createOverflow()
		it.overflow = h.extra.overflow
		it.oldoverflow = h.extra.oldoverflow
	}

	// decide where to start
	// 决定从哪里开始遍历(这也是 map 迭代器每次顺序都随机的原理所在)
	var r uintptr
	if h.B > 31-bucketCntBits {
		r = uintptr(fastrand64())
	} else {
		r = uintptr(fastrand())
	}
	// 随机迭代开始的 bucket 下标
	it.startBucket = r & bucketMask(h.B)
	// 随机迭代开始的 cell 下标(bucket 的槽也是随机的,每个 bucket 都从这个槽开始遍历,转一圈)
	it.offset = uint8(r >> h.B & (bucketCnt - 1))

	// iterator state
	// 当前遍历到的 bucket 下标(初始值)
	it.bucket = it.startBucket

	// Remember we have an iterator.
	// Can run concurrently with another mapiterinit().
	// 记住我们有一个迭代器。
	// 可以与另一个 mapiteinit() 同时运行。
	// 设置 hmap 迭代器标志位为 iterator|oldIterator
	// 1. 迭代前未进行扩容,迭代器初始化为 11,迭代过程中发送扩容,标志位被更改为 10
	// 表示迭代器正在使用旧 hmap.oldbuckets,迁移完毕不能其清理内存,标志新 bucket 没有被迭代器使用
	// 2. 迭代器初始化时正在经历扩容,标志为 11,表示 hmap.buckets 正在被迭代器使用
	// 此时,迭代器也会访问 hmap.oldbuckets 数据,因此也不能进行清理
	if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
		atomic.Or8(&h.flags, iterator|oldIterator)
	}

	// 开始一次遍历
	mapiternext(it)
}

这里着重讲解一下遍历 map 两个随机起点的计算方式:

  1. bucket 数组下标的起始值 it.startBucket = r & bucketMask(h.B),利用随机数 r 的后 B 位确定起始的 bucket 下标。
  2. 每一个 bucket 起始槽的偏移量 it.offset = uint8(r >> h.B & (bucketCnt - 1)) ,bucketCnt - 1 = 7,二进制也就是 111,r 右移 B 位后与 7 相与,得到一个 0~7 的槽下标。

map 底层是使用哈希表实现的,插入数据位置是随机的,所以遍历过程中新插入的数据不能保证遍历到,有可能在遍历过程中插入到已遍历过的 bucket 中,则没有机会进行遍历。

8.2 mapiternext 遍历下一个元素

map 遍历的核心在于理解 2 倍扩容时,老 bucket 会分裂到 2 个新 bucket 中去。而遍历操作,可能会按照新 bucket 的序号顺序进行,碰到老 bucket 未搬迁的情况时,要在老 bucket 中找到将来要搬迁到新 bucket 来的 key。

func mapiternext(it *hiter) {
	// 每一次元素遍历都需要调用一次 mapiternext 函数,
	// 例如:如果 map 中有10个值,则需要调用 10 次 mapiternext
	// 调用由 for ... range 完成,通过 it.key = nil,it.elem = nil 终止调用 mapiternext

	// h 为正在迭代的底层 hmap 实例
	h := it.h
	if raceenabled {
		callerpc := getcallerpc()
		racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiternext))
	}
	// 正在插入、修改、删除 key 但时候,不能获取下一个 key,
	// 注意:这不能保证插入、修改、删除之后,进行迭代。
	if h.flags&hashWriting != 0 {
		fatal("concurrent map iteration and map write")
	}
	t := it.t  // map 中存储的 key、val 类型 *maptype
	bucket := it.bucket // 当前遍历到的 bucket 下标(int 类型),从随机的 startBucket 开始,每次调用 mapiternext 重新赋值
	b := it.bptr // 当前正在遍历的 bucket 指针,这个值存在的时候,不用重新的再去找 bucket 了
	i := it.i // 迭代器当前遍历到的位置(bucket 内 key 的位置)
	checkBucket := it.checkBucket // 当前 bucket 是否需要检查迁移裂变位置 x,y

next:
	// 首次遍历 b == nil,为其赋初始值 startBucket
	// 遍历溢出桶 b == nil,表示该 bucket 链表遍历完毕,可以为其赋值下一个要遍历的 bucket 链表了
	if b == nil {
		// 遍历完成,把迭代器中 key,elem 置为 nil,退出循环
		// for...range 查看到 key 是 nil 会中止迭代
		if bucket == it.startBucket && it.wrapped {
			// end of iteration
			it.key = nil
			it.elem = nil
			return
		}

		// 首先得知道,迭代器是根据快照内容进行遍历的
		// 1.如果 it 快照指向了 oldbuckets,直接遍历 oldbuckets 就行了,不用管迁不迁移
		// 因为 oldbuckets 内存不会被清除,但 h.oldbuckets 字段在迁移完毕时,会等于 nil
		// 2.如果 it 快照指向了新 buckets && 2倍扩容过程中,需要按2倍下标遍历,
		// 才需要考虑当前 bucket 所属旧 bucket 是否被迁移,
		// 如果没有被迁移,则需要去旧桶读取数据,等量扩容直接读取旧数据就可以了,2倍扩容需要按 key 裂变位置读取。
		// hmap 正在扩容 && 迭代器快照 B == hmap.B
		// 1. 迭代器初始化的时候,hmap 已经在扩容过程中,此时迭代器指向新 buckets
		// 2. 迭代器初始化后,hmap 才发生扩容,但为等量扩容,此时迭代器指向旧 buckets
		if h.growing() && it.B == h.B {
			// 如果迭代器是在扩容过程中启动的,扩容尚未完成。
			// 如果我们正在查看的 bucket 尚未迁移,说明数据在 oldbucket 中,
			// 那么我们需要遍历旧存储桶,同时只返回将迁移到此存储桶的数据。
			// 那些需要迁移到另一个下标的桶则跳过。

			// 通过 &mask 的方式获取旧存储桶的下标
			oldbucket := bucket & it.h.oldbucketmask()
			// 获取旧存储桶中当前遍历 bucket 地址
			b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
			// 是否迁移完成
			if !evacuated(b) {
				// 旧 bucket 未进行迁移
				// 需要检查 bucket 的裂变情况 checkBucket 为要检查的 bucket 下标
				checkBucket = bucket
			} else {
				// bucket 已迁移(在新的也有数据,在旧的也有数据)
				// 直接用迭代器指向的 buckets 遍历即可
				b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
				checkBucket = noCheck
			}
		} else {
			// 1.不在扩容中(未进行扩容 || 扩容结束),直接去迭代器指向的 it.buckets 读取数据即可
			// 2.在扩容中 && it.B != h.B -> 表示 it.buckets 指向了 oldbuckets,
			// 里边有数据,直接遍历就行,不用在意迁移过程
			// 第2种情况下,这里如果有数据插入到新桶,会导致遍历不到,所以不建议并发执行遍历和插入操作
			// 而且 key 插入具有随机性,也有可能遍历不到
			b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
			checkBucket = noCheck
		}

		// bucket 下标加1,指向下一个 bucket
		bucket++
		// 判断下标是否是最后一个 bucket (bucketShift(it.B) 计算最大桶的下标)
		if bucket == bucketShift(it.B) {
			// 已经从第一个随机定位的 bucket 遍历到最后一个 bucket 了,下一个应该是第一个 bucket 了
			// 下标从 0 开始,继续循环(达到环形数组的效果,前面的 bucket 可能还未遍历)
			bucket = 0
			// 标志位:已经从桶数组的末尾环绕到开始
			it.wrapped = true
		}

		// 开始遍历新的 bucket 的时候,重置 i
		i = 0
	}

	// 遍历当前 bucket 的 8 个 cell
	for ; i < bucketCnt; i++ {
		// 计算桶内 cell 下标:从随机的 it.offset 下标开始遍历
		offi := (i + it.offset) & (bucketCnt - 1)
		// 如果槽是空的,或者 key 已经迁移,则跳过。(利用 tophash 的标志位判断)
		if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
			// TODO: emptyRest is hard to use here, as we start iterating
			// in the middle of a bucket. It's feasible, just tricky.
			continue
		}
		// 定位第 i 个槽的 key
		k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
		if t.indirectkey() {
			k = *((*unsafe.Pointer)(k))
		}
		// 定位第 i 个槽的 value
		e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))

		// 需要检查迁移后裂变的位置 && 增量扩容 -> 需要判断迁移之后的 key 落入的是 h.buckets 的前半部分还是后半部分(x 还是 y)
		// 前半部分 x 则需要在旧存储桶中获取属于该裂变部分的值
		// 后半部分 y 则跳过,后边还会遍历到 y 所属的新的 bucket,那个时候再进行处理即可
		if checkBucket != noCheck && !h.sameSizeGrow() {
			// Special case: iterator was started during a grow to a larger size
			// and the grow is not done yet. We're working on a bucket whose
			// oldbucket has not been evacuated yet. Or at least, it wasn't
			// evacuated when we started the bucket. So we're iterating
			// through the oldbucket, skipping any keys that will go
			// to the other new bucket (each oldbucket expands to two
			// buckets during a grow).

			// 如果 k 是可比较的
			if t.reflexivekey() || t.key.equal(k, k) {
				// If the item in the oldbucket is not destined for
				// the current new bucket in the iteration, skip it.
				hash := t.hasher(k, uintptr(h.hash0))
				// k 的 hash 后最终落到地 bucket 不是 checkBucket(意味着是后半部分),则跳过
				// 寻找被迁移到 checkBucket 的 key
				if hash&bucketMask(it.B) != checkBucket {
					continue
				}
			} else {
				// Hash isn't repeatable if k != k (NaNs).  We need a
				// repeatable and randomish choice of which direction
				// to send NaNs during evacuation. We'll use the low
				// bit of tophash to decide which way NaNs go.
				// NOTE: this case is why we need two evacuate tophash
				// values, evacuatedX and evacuatedY, that differ in
				// their low bit.

				// 对于不可比较的 key,
				// 是由 tophash 的最低位来决定迁移到前半部分还是后半部分的。
				if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
					continue
				}
			}
		}

		if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
			!(t.reflexivekey() || t.key.equal(k, k)) {
			// This is the golden data, we can return it.
			// OR
			// key!=key, so the entry can't be deleted or updated, so we can just return it.
			// That's lucky for us because when key!=key we can't look it up successfully.

			// 没有进行迁移 || key 不可比较
			// key!=key,所以该条目不能被删除或更新,所以我们只能返回它。
			// 这对我们来说很幸运,因为当 key!=key 时我们无法成功查找它。
			it.key = k
			if t.indirectelem() {
				e = *((*unsafe.Pointer)(e))
			}
			it.elem = e
		} else {
			// The hash table has grown since the iterator was started.
			// The golden data for this key is now somewhere else.
			// Check the current hash table for the data.
			// This code handles the case where the key
			// has been deleted, updated, or deleted and reinserted.
			// NOTE: we need to regrab the key as it has potentially been
			// updated to an equal() but not identical key (e.g. +0.0 vs -0.0).

			// 自迭代器启动以来,哈希表已扩容。
			// key 已经被迁移到其他地方,所以需要检查当前哈希表中的数据。
			// 此代码处理已删除、更新或删除并重新插入 key 的情况。
			// 注意:我们需要重新标记 key,因为它可能已更新为 equal() 但不是相同的 key(例如 +0.0 vs -0.0)。

			// 获取当前遍历到的 key/elem。
			rk, re := mapaccessK(t, h, k)
			if rk == nil {
				continue // key has been deleted
			}
			it.key = rk
			it.elem = re
		}

		// 每遍历一个元素,调用一次 mapiternext 函数,找到 key/elem 就返回
		// 记录迭代器当前迭代内容,然后进行下一次迭代
		// 记录下一个要遍历的 bucket 下标
		it.bucket = bucket
		// 记录当前正在遍历的 bucket
		if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
			it.bptr = b
		}
		// 记录遍历到哪个槽了
		it.i = i + 1
		// 记录是否需要检查 key 迁移裂变位置
		it.checkBucket = checkBucket
		return
	}
	// for 循环 8 个 cell 都没有 key 需要返回,则继续遍历溢出的桶
	b = b.overflow(t)
	i = 0
	goto next
}

通过源码分析,我们知道迭代器会给 hmap 初始化快照,当迭代器初始化后, map 发生扩容,此时迭代器快照是指向 oldbuckets 数组;如果此时插入、修改或删除值,按源码来看,会先对 bucket 进行迁移,然后插入新的 bucket,然而迭代器会根据 oldbuckets 进行遍历,则无法遍历到改变的值,因此官方是不建议遍历 map 和写操作并发执行的。

小结

通过对 map 底层源码的分析,解决了很多我对 map 特性的困惑,这里总结一下:

  1. map 是引用类型:因为创建 map 的函数返回了一个指针。
  2. map 遍历是无序的:桶起点和槽偏移点都是随机的。
  3. map 是非线程安全的:并发读可以,并发读写、写写会 panic,写操作发生时,会标记 hmap.flags, 如果同时有其他协程读写,会抛出错误,导致 panic。
  4. map 使用链表法解决哈希冲突:一个 bucket 有 8个槽 + overflow 指针指向 bmap 链表。
  5. map 扩容分为2倍扩容和等量扩容两种,扩容触发操作发生在插入 key 期间,触发条件为装载因子和过多的溢出桶。
  6. map 没有缩容机制,容量不会变小,即使删除 key,也不会减少 map 内存;由于没有缩容机制 map 内存存在一直变大的风险,所以大规模的存储尽量别用 map,要不会把你的内存干爆;后续希望官方增加缩容代码,看起来实现比较复杂。
  7. map 的迁移是渐进式的,迁移动作发生在写操作期间,写操作发生时,每次最多迁移两个桶,最少迁移一个桶。
  8. 删除不存在的 key 不会发生错误;查询不存在的 key,会返回对应类型的零值。

历经 2 周终于把 map 源码看完了,收获颇丰,这里整理出来分享给各位同学。 `` 以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。

转载自:https://juejin.cn/post/7293820517375164416
评论
请登录