逐行拆解 Go map 源码
本文会详细拆解 Go map 的核心源码。文中所用代码均来自 Go 1.18 版本,可能结合上下文需要做部分精简。另外 Go 1.16 和 Go 1.17 我也看过,源码基本是一致的,再早的版本就不敢保证了,不过思路应该不会差太多。
一、基础回顾
首先,Go 语言中的 map 是一个哈希表,阅读源码前我们一起来简单复习一下这个数据结构,便于后文理解。
1.1 哈希表
哈希表(Hash table,也叫散列表)
是使用频率极高的数据结构,主要用于高效查找。
- 存储时:每个
键(key)
根据一定规则被映射到内部数组中的一个桶(bucket)
,从而将值(value)
放入桶中,映射时候使用的规则称为哈希函数(hash function)
。 - 查找时:利用键通过和存储时相同的哈希函数计算桶的存储位置,从而定位到值。
1.2 哈希碰撞
理想情况下,每个键应该被哈希函数映射到唯一的桶中,但实际上哈希函数不太可能这么理想,使用中一定会遇到 哈希碰撞(Hash Collision)
,即不同的键通过哈希函数计算到相同的桶中。
解决哈希碰撞的方法一般有 2 种:
拉链法(Linking Addressing)
:将同一个桶中的元素通过链表的方式链接,由于链表的特点,新元素无需要提前分配内存。不足之处在于需要额外的指针链接元素,同时由于地址不连续,无法高效利用缓存。开放寻址法(Opening Addressing)
:所有元素都存储在桶数组中,当出现碰撞时候,按策略在数组中探测另外一个可用的位置;搜索时也通过相同的算法查找,直到找到对应数据或空槽为止。比较常用的探测策略有:线性探测(Linear Probing)
、平方探测法(Quadratic Probing)
等。
1.3 负载因子
哈希表有一项关键的统计数据称为 负载因子(load factor)
:
load factor = n / k
- n 是表中元素数量;
- k 是当前桶的数量;
负载因子越大,表明表中碰撞的概率越大。在桶数量不变的情况,随着表中元素增多,负载因子必然变大,此时哈希碰撞加剧,哈希表的性能开始越来越慢。因此,一般哈希表的实现都会有个扩容逻辑,用来增加桶的数量。
二、源码解读
2.1 内存布局
阅读其它逻辑前,必须对 Go map 的内存布局有一定了解,开始吧~
2.1.1 map 的主结构
map 的数据结构在 runtime/map.go:hmap
中,具体定义如下:
type hmap struct {
count int // 当前 map 中的元素个数,用于 len() 操作。
flags uint8 // 用于记录 map 当前状态,如是否正在执行写操作,后面会具体介绍。
B uint8 // 该值用于控制常规桶数组的长度。
noverflow uint16 // 用到的溢出桶的大概个数,为啥是大概?后面介绍溢出桶时候再说。
hash0 uint32 // hash seed,后面会具体介绍。
buckets unsafe.Pointer // 指向当前桶数组地址的指针,数组长度 2^B 。
oldbuckets unsafe.Pointer // 用于扩容过程中,保存扩容旧的桶数组的指针,仅在扩容阶段非 nil。
nevacuate uintptr // 用于在扩容过程中记录迁移进度,小于这个数的桶索引是已经迁移完成的。
extra *mapextra // 保存了 map 的一些可选数据。
}
// 保存了 map 的一些可选数据,不是每个 map 都有该数据,目前主要保存了溢出桶相关数据。
// bmap 是桶的结构,下面会再单独介绍。
type mapextra struct {
// 下面 2 个是比较特别的指针数组,和 GC 相关,仅在 bmap 不包含指针时候使用,溢出桶部分会详细介绍。
overflow *[]*bmap // 存储 buckets 用到的溢出桶指针集合。
oldoverflow *[]*bmap // 存储 oldbuckets 用到的溢出桶指针集合。
nextOverflow *bmap // 存储下一个可用的溢出桶指针。
}
// 编译时确定的 map 相关元数据。
type maptype struct {
typ _type // map 自身的类型元数据。
key *_type // map key 的类型元数据。
elem *_type // map value 的类型元数据。
bucket *_type // bucket 的类型元数据。
hasher func(unsafe.Pointer, uintptr) uintptr // key 的 hash 函数。
keysize uint8 // key 结构的大小。
elemsize uint8 // value 结构的大小。
bucketsize uint16 // bucket 结构的大小。
flags uint32 // 用于标识 key/value 的行为,如是否被转为指针了。
}
桶的数量
上面源码的注释其实已经提到了,桶(不包含溢出桶)的数量由 hmap.B
控制,具体值为 2^B
,计算逻辑如下:
// 传入的参数是目标 map 的 B 值。
func bucketShift(b uint8) uintptr {
// 与操作主要是为了防止溢出:goarch.PtrSize 是个常量,表示该平台上一个指针的字节数,
// 所以 goarch.PtrSize * 8 就是一个指针的位数,-1 后就是个二进制全 1 的值,
// 也是该平台位操作能移动的最大位移。
return uintptr(1) << (b & (goarch.PtrSize*8 - 1))
}
hash 算法
hash 算法是在编译时候确定并赋值给 maptype.hasher
的,不同类型会由编译器直接决定一个合适的算法,覆盖类型全面的算法可以参考:src/runtime/alg.go:typehash
。
另外,我们再仔细看下 hash
函数的签名,除了元素的指针以外,还需要一个 uintptr
,这个参数其实是个 hash seed
。
func(unsafe.Pointer, uintptr) uintptr // hash 函数签名。
map 中使用的 hash seed
保存在 hmap.hash0
,这个字段是个随机数,会在 map 初始化及每次元素清零时候重新生成。因此每个 map 会有自己独立的哈希结果,也就有不一样的元素分布。这样以预防利用哈希碰撞做的 DoS 攻击,假设哈希算法稳定不变,容易被攻击者利用计时攻击等方式刻意制造出哈希碰撞,用于 DoS 攻击;
flags 的位含义
flags
用于记录 map 的状态,目前主要记录了 map 是否正在 写入
、迭代
、扩容
等,这里暂且留个印象,后续不理解再回来参考。
const (
// flags
iterator = 1 // 表明有迭代器正在使用 buckets。
oldIterator = 2 // 表明有迭代器正在使用 oldbuckets。
hashWriting = 4 // 表明有 goroutine 正在做写操作。
sameSizeGrow = 8 // 表明当前正在进行等量扩容。
)
2.1.2 桶的结构
const (
// bucketCnt 定义了每个桶中存储的元素个数,就是常量 8 个。
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
// 定义了 map 中 Key 和 Value 能存放的大小上限,单位字节,超过的话会为被转为存储元素的指针。
maxKeySize = 128
maxElemSize = 128
)
// 桶的数据结构。
type bmap struct {
// 槽位状态数组,槽位无数据时用于快速判断槽位状态,
// 槽位有数据时则存储了 hash(key) 的最高字节。
tophash [bucketCnt]uint8
}
根据常量的定义,每个桶中能存放 8 (bucketCnt)
个元素。可以看到 bmap
的结构定义中仅有一个 tophash
字段,这是一个长度为 8 的 uint8 数组,保存了桶内 8 个槽位的信息,用于快速判断槽位状态。
- 槽位无数据时:对应的
tophash
保存了几个特殊的状态值,用于快速判断槽位状态; - 槽位有数据时:对应的
tophash
则存储了hash(key)
的最高字节;
下面列出了无数据时 tophash 的状态常量,后面阅读源码时候再回来参考,现在有个印象就可以了。
const (
// tophashs
emptyRest = 0 // 代表对应的 key/value 为空,且本元素之后的位置都是空的,无需继续寻找,是槽位的最始状态。
emptyOne = 1 // 代表对应的 key/value 为空,但曾经有过值,是个碎片空位。
evacuatedX = 2 // 元素被迁移到新桶中的 x(高位)桶了。
evacuatedY = 3 // 元素被迁移到新桶中的 y(低位)桶了。
evacuatedEmpty = 4 // 表明该位置为空,且走过迁移逻辑了。
minTopHash = 5 // tophash 的最小值,计算出来如果小于这个数,会直接加上这个数,主要是用于避免和前面这几个特殊值冲突。
)
等等,桶的结构就这样?那 key/value 保存在哪呢?其实, bmap
这个结构更准确的理解应该是一个 bucket header
。由于 map 中没有使用范型(嗯,也许以后会改进吧,毕竟以前没范型可用啊 OMG),所以并没有将字段定义出来,在后面的 赋值、访问 等逻辑中,我们能看到 map 是通过计算偏移量来定位 key/value 的。这里先看下偏移量的计算方式,后面的逻辑会反复用到这个偏移量,需要提前理解一下。
const (
// 这里用了个小技巧算出了 bmap 结构字节对齐后后部可用空间和结构头部的偏移量,
// bmap 指针加上这个值就是 key 的起始地址,后面的访问删除等会反复用到这个偏移量。
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
)
最后我们可以脑补出一个完整的 bucket
结构:
// 脑补一个完整的范型版本 bucket 结构。
type bucket[K comparable, V any] struct {
// bmap 理解成一个 bucket header。
bmap
// 注意:这里 key 和 value 并不是交替存储,这个设计是为了字节对齐。
key [bucketCnt]K
value [bucketCnt]V
// 存储溢出 bucket 的指针,在槽位不够时候用拉链法链出去。
overflow uintptr
}
碰撞处理
从上面数据结构可以看出,当前版本中 Go 语言的 map 中每个桶的能存储的元素最多为 8个,那么当碰撞的元素超过 8 时候怎么办呢?这里先简单介绍下碰撞后的处理,先有个整体概念,源码细节后面会再介绍。
Go 中结合使用了 拉链法
和 线性探测
进行碰撞处理。
- map 中的一个桶能存放 8 个元素,当元素被 hash 到同一个桶中时,会先在桶内寻找剩余的位置,这里用到了线性探测。
- 8 个槽位用满后,每个桶的最后保存了一个叫做
overflow
的指针,指向下一个桶,这样的桶被称为即溢出桶(oveflow bucket)
,当桶中已经没有槽位时候,会沿着overflow
指针将数据保存到溢出桶中,这里用到了拉链法。
key/value 的大小
上面可以看到 2 个常量:
const (
maxKeySize = 128
maxElemSize = 128
)
这 2 个常量指明了 map 中能直接存放的元素大小的最大值,如果超过该阈值,元素将被转为指针方式存储(用 map 时得考虑下元素类型了~)。在 map 源码中的影响主要体现为 indirectkey
/indirectelem
两个函数,这 2 个函数会在存储的结构大小超过 128 字节后返回 true
。后面可以看到 访问/赋值 相关代码,会根据该结果判断是否需要取地址,这里也先留个印象,后面看到 indirectkey
/indirectelem
可以再回来看看这段。
2.1.3 全貌
到这里大家应该能想象出 map 在内存中的整体布局了,暂时还不行也没事,笔者画了一张:
2.2 初始化
2.2.1 小 map 初始化
我们知道 map 有 2 种创建方式:
make(map[k]v)
make(map[k]v, hint)
这里 hint
指定的是预估存储容量。当使用第一种方式初始化,或提供了 hint
但小于等于 8,且 bucket 被编译器认为需要堆分配时候,make
将被编译为 makemap_small
的调用。
func makemap_small() *hmap {
h := new(hmap)
h.hash0 = fastrand() // 随机生成 hash seed。
return h
}
为什么限制了上面 2 个条件?
makemap_small
调用后,很明显B=0
,即此时应有2^0=1
个桶,由于限制了hint<=8 (bucketCnt)
,此时仅需要这一个桶就能装下所有元素;- 后面在赋值代码能看到,这种方式的桶内存分配是直接发生在首次赋值时,赋值逻辑会直接调用
newobject(t.bucket)
走堆内存申请。
2.2.2 常规初始化
不满足 makemap_small
方式的 map 初始化,最后会编译为 makemap64
或 makemap
函数的调用(makemap64
最后也调用了 makemap
函数,只是多了一个 hint
为 int64
的溢出控制),下面分析下 makemap
函数。:
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 判断下 hint 个数的元素所需空间是否会导致内存溢出,并不是在这里申请内存哈。
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
if h == nil { // 啥时候为 nil?看后面的解释部分。
h = new(hmap)
}
h.hash0 = fastrand() // 随机生成 hash seed。
// 这里用的 overLoadFactor 用来判断 B ,这里其实是根据 hint 找到一个满足负载因子的最小 B。
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
// 申请桶内存,如果 B = 0 就会在赋值时候走懒加载的初始化逻辑。
if h.B != 0 {
var nextOverflow *bmap
// 通过 makeBucketArray 初始化桶数组。
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil { // 如果顺带申请了溢出桶,就需要初始化 mapextra 保存溢出桶的指针。
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
需要注意一下 makemap
的 h
参数,当编译器认为目标 map 及其第一个桶可以分配在栈上时候,会传入栈指针,此时 h 为 non-nil 就不会为 hmap 走堆内存申请,如果 B=0
,也不会走桶内存的堆内存申请,这种情况下 map 就会整个分配在栈上。
最后看一下 bucket 的堆内存申请逻辑。
// 为 map 申请桶数组空间。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := bucketShift(b) // 前面提过,这个函数返回 1<<b,这里就是计算桶个数。
nbuckets := base // base是不包含溢出桶的,后面会根据情况增加溢出桶,最后实际申请的桶个数保存在本变量中。
// 计算需要申请的溢出桶个数,只堆桶个数大于 2^4=16 时候计算。
// 对于更小的 b ,不太可能发生溢出,不需要计算。
if b >= 4 {
nbuckets += bucketShift(b - 4) // 默认加入 2^(b-4) 个溢出桶。
sz := t.bucket.size * nbuckets // 计算溢出桶所需空间。
// 这里是利用 SpanClass 计算申请这么大的空间,实际会返回多大空间的 mspan(mspan相关可以见之前的分享哈~)
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size // 用能拿到的空间重新算一下能分配的桶数量,充分利用空间。
}
}
// dirtyalloc 主要为了重用 bucket array,仅仅在 map_clear 时候用到。
// 新建 dirtyalloc 时候传入的都是 nil,只有在 mapclear 时候才会传入非空 dirtyalloc。
if dirtyalloc == nil {
// 注意这里,初始化时候的 buckets 是一块连续的内存,但后面独立申请的溢出桶不是。
buckets = newarray(t.bucket, int(nbuckets))
} else {
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
// 根据是否包含指针,选择清理方式,有对外引用需要在写屏障的保护下清理内存。
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size) // 这种清理方式不会开启写屏障。
}
}
// 这里不等,代表上面逻辑认为需要提前申请部分溢出桶,这里申请。
if base != nbuckets {
// 计算出第一个溢出桶的位置。
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
// 计算出最后一个桶的位置。
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
// 这里用了个奇怪的小技巧,将最后一个桶指向一个 non-nil 值,
// 这样在获取溢出桶时候,可以通过 nextOverflow 指针判断自己是否是最后一个溢出桶,
// nil 说明还有桶,non-nil 说明已经是最后一个桶了!
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
2.3 访问
2.3.1 源码阅读
我们知道 map 的访问方式有 2 种:
val := h[key]
val, ok := h[key]
这在编译时候会编译成不同的函数调用,对应函数签名如下:
// val := h[key]
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
// val, ok := h[key]
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool)
两个函数除了多一个用于标识元素存在性的 bool
,并没有什么不同,这里通过 mapaccess1
看一下查找的核心逻辑:
// mapaccess1 函数返回一个 h[key] 的指针。
// 永远不会返回 nil,当 key 不存在与 h 中时候,返回类型的零值。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// ..省略一些跟踪开关和检查..
// 如果 map 未初始化或没有元素,返回类型零值。
if h == nil || h.count == 0 {
// 根据 Go 语言规范,传入非 comparable 类型时候,应该 panic,而不是返回空值,所以有下面这个 if。
if t.hashMightPanic() {
t.hasher(key, 0)
}
return unsafe.Pointer(&zeroVal[0])
}
// hashWriting 是个 flag 位,如果为 1 表示当前有一个 goroutine 正在写入,
// 这是个 unrecoverable 的错误,读取时候如果遇到该状态,会直接崩。
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
hash := t.hasher(key, uintptr(h.hash0)) // 计算 Hash。
m := bucketMask(h.B) // len(bucket) - 1。
// 计算对应 bucket 的指针地址。
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 如果 oldbuckets 不为空, 说明当前正处于扩容状态。
// 老的 buckets 中的数据可能还未搬迁至新的 buckets 里, 先在老的 buckets 中找。
if c := h.oldbuckets; c != nil {
// 如果当前不是等大扩容(后面扩容部分会详细介绍),就是桶的数量增加了一倍,
// 这里直接除 2 (即 >> 1)缩小回来,就能对应到在老 bucket 中的位置。
if !h.sameSizeGrow() {
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) // 计算老的 bucket 的地址。
// 判断 bucket 是否已经迁移。
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash) // 桶内的 hash 定位仅使用 hash 的高 8 位。
// 下面这个循环会遍历 key 对应的所有桶中的数据。
// 一次循环找一个桶,找不到就沿着 overflow 指针找下一个溢出桶。
bucketloop:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
// 通过对比 tophash 来做初步定位。
if b.tophash[i] != top {
// emptyRest 是特殊的 tophash,前面介绍过可翻上去看,
// 该值表明该位置为空,且后面也为空,无需再向后找。
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 直接计算偏移量找到 key 的地址。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() { // 存储时候被转成指针存储的数据,需要取出指向的数据。
k = *((*unsafe.Pointer)(k))
}
if t.key.equal(key, k) { // tophash 相同也可能是 hash 碰撞,因此这里要确定 key 相等。
// 和找 key 一样,直接计算偏移量找到 value。
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() { // 存储时候被转成指针存储的数据,需要取出指向的数据。
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
return unsafe.Pointer(&zeroVal[0]) // 没找到的情况下,返回类型零值。
}
mapaccess2
的逻辑类似,这里不再解释。
2.3.2 选桶算法
代码中选择桶的算法主要是下面 2 行,注意这里的 hash&m
,Go 的选桶用的是 取与法
。
m := bucketMask(h.B) // len(buckets) - 1。
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) // 计算对应 bucket 的指针地址。
m
的值是 len(buckets)-1
,而由于 len(buckets)
是 2^B
,因此 2^B -1
一定是一个二进制有效位均为 1 的数,此时任意数和 m
进行与运算的结果范围就是完整的 [0, 2^-1]
,可以利用区间内所有位置。
例子:假设 B = 2
即 len(buckets) = 1<<2 = 4 = 0x00000100
则 m = len(buckets) - 1 = 3 = 0x00000011
任意数和 m 取 & 的区间都为 [0,3],和正数 %4 的结果一样。
2.3.3 找元素算法
根据上面代码可以梳理出 map 定位元素的逻辑大概是这样:
- 计算 hash 值,然后将 hash 值 和
2^B-1
进行与运算,用取与法
选中桶; - 如果当前正在扩容,判断桶是否已经迁移,从而判断应该在哪个桶中查询(迁移逻辑后面介绍);
- 使用 hash 的高 8 位,在桶中依次和
tophash
的元素对比,如果遇到相同的tophash
,取出 key 进行二次确认,如果命中就返回,没有命中继续比完; - 桶中元素都遍历后,如果还没找到,就在接着溢出桶指针找链接的溢出桶,找到没有为止。
2.4 赋值
2.4.1 源码阅读
map 的赋值操作会被编译成 runtime.mapassign
函数调用,直接看代码吧:
// 注意看,参数中并没有 value,本函数的逻辑其实是寻找一个用于存放 value 的内存地址。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil { // 和访问不同,赋值操作涉及到存储,肯定不允许 map 为 nil 了。
panic(plainError("assignment to entry in nil map"))
}
// ..省略一些跟踪开关和检查..
// hashWriting 是个 flag 位,如果为 1 表示当前有一个 goroutine 正在写入,
// 这是个 unrecoverable 的错误,读取时候如果遇到该状态,会直接崩。
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0)) // 计算 hash。
// 设置 hashWriting 以标识当前正在写入。
// 这边和上面的判断不是原子操作,看起来并发时候可能绕过该限制?
// 实际上不会的,因为最后 done 的时候还判断了一次。
h.flags ^= hashWriting
// buckets 的懒加载逻辑,初始化时如果没有初始化桶空间,在这里初始化。
if h.buckets == nil {
h.buckets = newobject(t.bucket) // 因为只有 B=0 时候会懒加载,所以这里值仅请了一个桶的空间。
}
again:
bucket := hash & bucketMask(h.B) // 和查找一样的逻辑,取与法获取对应的桶。
if h.growing() { // 如果当前正在扩容,那么就负债将当前命中的桶迁移掉,后面详细介绍扩容。
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) // 命中桶的地址。
top := tophash(hash) // 获取 hash 的高 8 位。
var inserti *uint8 // 记录 tophash 地址。
var insertk unsafe.Pointer // 记录 key 地址。
var elem unsafe.Pointer // 记录 value 地址。
// 下面这个循环用来遍历桶,寻找合适的槽位,赋值不在里面。
bucketloop:
for {
// 遍历 bucket 中的空位。
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
// 不等时可能是有值的位置或空闲位置,
// isEmpty 函数内部判断当前位置的值是否是 emptyOne 或 emptyRest,
// 返回 true ,代表是一个空槽位,if 内结合 inserti == nil 的判断,
// 保存第一个空槽位的三大件备用。
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
// emptyRest 是特殊的 tophash,该值表明该位置为空,且后面也为空,可以拿来直接用。
// 注意这里是唯一一个找到空槽位的退出口,emptyOne 并不是。
if b.tophash[i] == emptyRest {
break bucketloop
}
continue // 槽位被占用,或者是 emptyOne。
}
// 走到这里说明 tophash 已经撞上, 那就把 key 拿出来确认下~
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() { // 存储时候被转成指针存储的数据,需要取出指向的数据。
k = *((*unsafe.Pointer)(k))
}
// 用类型的 equal 方法比对一下 key 是真的相等,还是只是hash碰撞而已。
if !t.key.equal(key, k) {
continue
}
// 走到这里说明 key 原本就已经存在,判断下是否要更新 key,需要的话更新一下。
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
// 在已经有 key 的情况下算出 value 位置后直接返回了。
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
goto done
}
// 上面循环没找到合适位置,就顺着溢出链在下一个桶继续找。
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// 如果有命中存在 key 的情况,上面的循环已经直接跳到 done 了,能走到这说明本次操作有新增元素。
// 此处是触发扩容的时机,如果元素 +1 让负载因子超标,或者溢出桶过多,就开始扩容。
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h) // 把 map 转扩容状态,留个印象,后面专门拿一节细说。
goto again // 由于进入扩容状态,本次添加需要在新桶中,上面的流程需要再来一次 OMG!
}
// 如果下面这个条件成立,说明上面的循环并没有找到能用的空间,需要拿溢出桶了。
if inserti == nil {
// 内部会优先使用 extra 中准备的溢出桶,用光了才会去创建新的。
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
// 如果超过了 128 最大字节大小限制的 key/value 需要送到堆去了,map 中存储指针。
if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.key, insertk, key) // 保存 key。
*inserti = top // 记录 tophash。
h.count++ // map 中的元素加1。
done:
// 再来一次并发写校验,如果开头的的判断并发写状态穿透了,也会在此处发现。
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
if t.indirectelem() {
elem = *((*unsafe.Pointer)(elem))
}
return elem // 返回可用的 value 地址。
}
2.4.2 槽位寻找算法
槽位的寻找上面代码其实很清晰了,这里总结一下流程:
- 先判断当前是否正在进行扩容,如果正在扩容,就需要肩负起目标桶的迁移工作,迁移后再继续做寻址;
- 遍历目标桶链寻找合适的存储位置。通过
hash(key)
的高 8 位,在桶中依次和tophash
的元素对比快速筛选数据,遍历时候会保存遇到的第一个空槽位信息; - 如果找到了相同的 key,结束整个查找,返回对应的 value 地址,这个情况没有新增元素;
- 如果遍历时没有找到对应的 key 信息,将在遇到
emptyRest
后退出循环,以避免过度遍历,这个情况会有新增元素,因此需要判元素+1
后断负载因子是超标,是否需要扩容; - 如果已经没有空余槽位,将获取一个溢出桶,将元素放在溢出桶的头部;
这里
emptyRest
表示桶中的最后一个位置,它的值是 0 是uint8
零值,因此如果没有元素,一开始就能退出。
2.4.3 溢出桶
赋值代码可见,赋值过程中,如果当前桶均没有空闲槽位时会通过 newoverflow
函数申请新的溢出桶,这里看下该函数的逻辑 :
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
var ovf *bmap
if h.extra != nil && h.extra.nextOverflow != nil {
// 前面介绍过,map 初始化时,如果 B>4 会提前准备一些溢出桶,这里如果还有可用的桶,会直接取之。
ovf = h.extra.nextOverflow
if ovf.overflow(t) == nil {
// 初始化的逻辑提到过,nextOverflow == nil 时候表示该桶后面还有可用的溢出桶。
h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
} else {
// 初始化的逻辑提到过,nextOverflow != nil 时候表示该桶已经是最后一个溢出桶了,
// 所以需要更新下 extra 的数据。
ovf.setoverflow(t, nil)
h.extra.nextOverflow = nil
}
} else {
ovf = (*bmap)(newobject(t.bucket)) // 申请一个新的溢出桶。。
}
// 更新下 hmap.noverflow 字段,注意 noverflow 是个 uint16,
// 而溢出桶的个数是可能超过的,因此这里其实不是准确的溢出桶个数,
// 在超过 uint16 后,会存储一个最大值。
h.incrnoverflow()
if t.bucket.ptrdata == 0 { // 如果不包含指针,需要记录到 overflow 中。
h.createOverflow()
*h.extra.overflow = append(*h.extra.overflow, ovf)
}
b.setoverflow(t, ovf) // 把溢出桶链到当前桶的后面。
return ovf
}
注意上面 h.extra.overflow
的逻辑,再回顾下 mapextra
结构:
type mapextra struct {
overflow *[]*bmap // 存储 buckets 用到的溢出桶。
oldoverflow *[]*bmap // 存储 oldbuckets 用到的溢出桶。
nextOverflow *bmap // 存储下一个空闲溢出 bucket 指针。
}
这里有一段比较特别的逻辑:如上可见,overflow
仅在 bucket
不包含指针时候使用,为啥呢?因为在 key/value 均不包含指针时,Go 编译器会将 bmap
标记为无指针(此时会被分配在 noscan 的 mspan
中),GC 并不会扫描它。那么问题来了,上面提到 bmap
的尾部实际上是存有一个 uintptr
类型的 overflow
指针的,由于该指针不会被 GC 标记,溢出桶在 GC 标记阶段不会被涂黑,会导致在 GC 清理阶段被回收。因此这里用 mapextra.overflow
指针指向独立申请的溢出桶们,目的仅是为其保留一个会被扫描到的引用。
2.4.2 key 的更新
上面还有个小细节,我们注意到,即使在 key 被确认相同后,赋值逻辑也会去尝试更新一下 key,这是为啥?感兴趣可以参考这个帖子,这里不再赘述:[go] runtime: on map update, don't overwrite key if we don't need to. (google.com)
2.5 扩容
2.5.1 触发时机
前面赋值逻辑中提到,触发扩容的时机在这段代码:
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again
}
这里可以看出,有两种场景会触发扩容:overLoadFactor
或 tooManyOverflowBuckets
返回 true,这分别对应了下面两种情况。
扩容的触发条件 1:负载因子超标
func overLoadFactor(count int, B uint8) bool {
// count 是本次新增元素后的元素数量。
// bucketShift(B) 就是 1<<B,即桶的个数,注意一点,这个地方没有将溢出桶数量算进来。
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
bucketCnt
上面说内存结构时候提到过,当前版本为常量 8,是单个桶的元素个数,因此第一个条件是 要求元素总数超过一个桶,否则前面提到的 makemap_small
就很尴尬了。
第二个条件里有 2 个与 负载因子
相关的常量:loadFactorNum
和 loadFactorDen
,loadFactorNum/loadFactorDen
的结果就是触发扩容的阈值负载因子,当前版本中为 13/2 = 6.5
。第二个条件的不等式稍微变换一下,就是在判断当前负载因子是否超过了 6.5 这个阈值。
为什么负载因子阈值选择了 6.5?在
runtime/map.go
文件顶部的注释,开发者给出了实验数据,感兴趣的去阅读下。
扩容的触发条件 2:溢出桶过多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
// noverflow 是个 uint 16,因此限制在 uint16 内比对。
return noverflow >= uint16(1)<<(B&15)
}
过多的定义是超过了常规桶的个数。考虑一种情况:通过反复的 添加/删除 少量数据,map 可以渐渐积累许多的溢出桶,由于元素不多,可能根本不会达到负载因子的阈值。此时申请的溢出桶并不会被释放 map 的内存会慢慢积累泄漏,二是 key/value 的分布可能非常松散,存在大量的的碎片空间(即 emptyOne
) 导致查找插入效率都极为低下,因此当检测到溢出桶过多时候,会进行等量扩容,等于是做了一次碎片整理,另外扩容还会清掉多余的溢出桶。
2.5.2 开始扩容
触发扩容有两种情况,扩容算法根据这两种情况也做了区分。
- 负载因子超标:此时
B
将被+1
,桶数量直接翻倍; - 溢出桶过多:进行等量扩容,可以理解为重建桶数组,将所有老桶中的元素都转移到新桶,在这过程中将重新排列 key/value,等于做了一次碎片整理,自然提高了空间利用率;
扩容的开始逻辑在 runtime.hashGrow
函数之中,让我们仔细看看:
// 注意:本函数只是分配好了内存地址,做好了迁移准备,并没有真正迁移数据。
// 实际迁移逻辑在 growWork 函数中,会在 访问(mapassign)/删除(mapdelete) 函数中调用。
func hashGrow(t *maptype, h *hmap) {
// bigger 保存了本次操作 B 的增量,等量扩容设置为 0,增量扩容设置为 1。
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
// 准备好新老 2 桶数组的指针,makeBucketArray 在初始化函数时候提到过,这里就不再赘述了。
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
// 下面逻辑就是判断当前是否正在进行迭代,是的话改将状态标记为老桶正在迭代。
// tips: &^ 是按位清零运算符,iterator 和 oldIterator 可以回去看前面的 flag 常量介绍。
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets // 换桶!
h.buckets = newbuckets
h.nevacuate = 0 // 既然刚开始,迁移进度当然是 0。
h.noverflow = 0 // 溢出桶数量清 0。
if h.extra != nil && h.extra.overflow != nil {
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil") // 这是并发扩容啦- -!
}
h.extra.oldoverflow = h.extra.overflow // 转移避免溢出桶被 GC 的指针数组。
h.extra.overflow = nil
}
// 如果 makeBucketArray 函数创建了新的溢出桶,就保存到可用溢出桶指针里咯!
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}
2.5.3 迁移元素
上面可以看到,hashGrow
只是做好了扩容准备,申请好了内存,并没有真实迁移数据。实际数据的迁移逻辑是 “渐进” 发生的,在 growWork
函数中,growWork
函数会在 访问/删除 时中调用。
func growWork(t *maptype, h *hmap, bucket uintptr) {
// oldbucketmask 返回的是扩容之前的哈希掩码,而传入的 bucket 参数是新桶中的位置,
// 这里与一下,就算出了目标桶在旧桶中的位置(思考一下,bucket 都是 2 的 n 次幂)。
evacuate(t, h, bucket&h.oldbucketmask())
// 出发该逻辑的代码需要肩负额外迁移一个 bucket 的任务,以加快进度,
// 注意这里传入的是 h.nevacuate,即最小未迁移位置。
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
上面代码中提示,实际迁移逻辑在 evacuate
里,该函数每次负责迁移一个桶链 (桶自身和其链接的溢出桶),先来看一个表示新桶中迁移进度的结构:
type evacDst struct {
b *bmap // 目标新桶。
i int // 下一个空闲位置的索引。
k unsafe.Pointer // 下一个可用 key 的内存地址。
e unsafe.Pointer // 下一个可用 value 的内存地址。
}
先想想下,翻倍扩容时,对应到每个桶上,其实是个 1 变 2 的操作,evacuate
里的迁移逻辑会用一个长度为 2 的 evacDst
数组存放这 2 个桶,并把第一个桶称为 x
,第二个桶称为 y
。而 map 的选桶算法用的是与运算,此时,由于容量翻倍 B
会 +1
,在做与运算定位时候,就会多释放出一位掩码,与运算的结果除了多释放出的这一位,其余位应该和元本意义,而多释放出的这一位,会用来决定元素放在 x
,还是 y
。另外,等量扩容时候只会用到 x
桶。接下来看看具体逻辑:
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 直接算出旧桶的地址。
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
newbit := h.noldbuckets() // 计算一下桶扩容前的长度。
if !evacuated(b) { // 当然迁移的前提是还没迁移过。
var xy [2]evacDst // xy 用于保存高低 2 个桶的进度数据。
x := &xy[0]
// 低位桶直接用原来的偏移量去算就可以了。
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset) // x 桶第一个 key 的位置。
x.e = add(x.k, bucketCnt*uintptr(t.keysize)) // x 桶第一个 value 的位置。
if !h.sameSizeGrow() { // 只有增量扩容,才会需要用到低位桶。
y := &xy[1]
// 高位桶的地址需要加上原本的长度(即增加的桶个数)。
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset) // y 桶第一个 key 的位置。
y.e = add(y.k, bucketCnt*uintptr(t.keysize)) // y 桶第一个 value 的位置。
}
// 外层循环遍历整个老的 bucket 链。
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset) // 当前桶第一个key的地址。
e := add(k, bucketCnt*uintptr(t.keysize)) // 当前桶第一个value的地址。
// 内层循环遍历 bucket 的槽位。
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i]
// 和赋值逻辑中一样,isEmpty 函数内部判断当前位置的值
// 是 emptyOne 或 emptyRest 代表是一个空槽位,设置为 evacuatedEmpty 即可。
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty // 设置成无数据槽位的迁移结束状态。
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k // k 可能被转成指针了,用 k2 保存原始值的地址。
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
if !h.sameSizeGrow() { // 增量扩容逻辑。
hash := t.hasher(k2, uintptr(h.hash0)) // // 计算出 key 的 hash。
// 这里为自身值不等于自身的数据走了个特殊的逻辑,后面单独解释。
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
useY = top & 1
top = tophash(hash)
} else {
// 思考一下,和旧桶长度做与运算,刚好是新释放出来的最高位,用这位来决定放 x 还是 y。
if hash&newbit != 0 {
useY = 1
}
}
}
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
// evacuatedX + 1 == evacuatedY
// 下面这行就是标记老位置的 tophash 为特殊值,便于后面找到此位置时候的定位。
b.tophash[i] = evacuatedX + useY
dst := &xy[useY] // 本次迁移的目的地。
// 如果目标桶满员了,那需要找个新的溢出桶,同时更新到进度对象中。
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
// 下面这个 & 很神奇,因为通过阅读上下文可一发现,其实不可能索引越界,
// 也无需取模才对,代码中的注释说这个掩码是个优化技巧,可以避免极限检查。
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
// 下面拷贝 key 和 value 了
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2
} else {
typedmemmove(t.key, dst.k, k)
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
// for 的第三部分只移动了源地址指针,这里移动目标地址指针。
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// 如果老的 bucket 当前没有在迭代状态,且是包含指针的类型
// 这里就清理掉指向溢出桶的指针,以让其能被 GC 回收。
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
// 这里重新计算出本次迁移桶的地址,因为最初算的那个地址已经随着上面循环变更位置了。
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// 跳过了 b.tophash 因为里面维持着迁移状态。
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
// 如果本次迁移的桶编号和当前进度的编号一样,进度就可以前进了~
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
// 更新迁移进度标示,注意 newbit 参数是 len(oldbuckets)。
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++ // 进度前挪。
// 这里控制下前进进度的循环,最多往前判断 1024 个桶,避免过度判断影响性能。
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
// 循环前进进度,寻找下一个还未迁移的桶。
// bucketEvacuated 内部通过判断 tophash 是否在迁移标示的范围:tophash[0] > emptyOne && tophash[0] < minTopHash,来确定是否已经迁移过了。
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 这个等式成立,表明已全部迁移,可以清理旧数据了。
if h.nevacuate == newbit {
h.oldbuckets = nil
if h.extra != nil {
h.extra.oldoverflow = nil // 释放掉溢出桶的指针。
}
h.flags &^= sameSizeGrow // 清除等量扩容标志位。
}
}
迁移的逻辑比较复杂,但是是整个 map 的精华所在,细节已经注释得很详细了,需要看代码细品哈。
2.5.4 NaN
Go 中有些特殊的数浮点数:NaN
,这种神奇的类型自己不等于自己!并且当这类值或包含了这类值的结构被拿来做 key 时候,hash 值是随机的(可以参考 src/runtime/alg.go:f64hash
),因此 map 无法为 key 计算出相同的 hash 值,自然也无法通过 key 取回数据,看下面例子:
type keyType struct {
Num float64
Test string
}
func main() {
f1 := 1.1
f2 := math.NaN()
fmt.Println(f1 == f1, f2 == f2) // 输出 true false。
h := make(map[keyType]int, 2)
k1 := keyType{f1, "test"}
k2 := keyType{f2, "test"}
h[k1] = 123
h[k2] = 456
fmt.Println(h[k1], h[k2]) // 输出 123 0。
}
那么要如何取出数据呢?事实上这类数据只能在迭代时候再次出现了,因此拿来做 key 的类型如果内嵌了浮点数类型要特别特别特别的小心!这里回顾下增量扩容逻辑选桶时候的一个判断:
// 这里为自身值不等于自身的数据走了个特殊的逻辑,后面单独解释。
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
useY = top & 1
top = tophash(hash)
} else {
// 思考一下,和旧桶长度做与运算,刚好是新释放出来的最高位,用这位来决定放 x 还是 y。
if hash&newbit != 0 {
useY = 1
}
}
如果当前存在迭代器,将用老桶中的 tophash
的最后一位,来决定高低桶;而在不存在迭代器时,就还是走 hash 的逻辑,由于 hash 值随机,所以分配的位置也随机了。有这个区别的原因是迭代器在迭代迁移过程中的新桶时,会用和这里同样的算法回老桶中找数据,如果这里没有稳定的输出,会导致迭代器缺少或重复输出元素。
2.6 迭代
2.6.1 迭代器循环
随便写一段简单的 for..range
来迭代 map
,然后执行 go tool compile -S
可以很容易看出,迭代语句最后被编译成了 mapiterinit
和 mapiternext
函数的调用,翻译成 Go 语言大约是类似这样的结构:
var it hiter
for mapiterinit(t, h, &it); it.key != nil; mapiternext(&it) {
key := *it.key
value := *it.elem
// ...其它逻辑...
}
2.6.2 迭代器
上面 mapiterinit
函数用处是初始化一个 hiter
对象,这是一个迭代器,先看下这段逻辑:
// hash 迭代器(hash iteration)结构
type hiter struct {
key unsafe.Pointer // 必须在第一个位置,值为 nil 时迭代结束。
elem unsafe.Pointer // 必须在第二个位置。
t *maptype // map 的类型元数据。
h *hmap // 目标 hmap。
buckets unsafe.Pointer // 初始化迭代器结构时的 buckets 数组。
bptr *bmap // 当前要迭代的 bucket 的指针。
overflow *[]*bmap // 保存了 bmap.extra 上的 overflow 用来保证溢出桶不被回收。
oldoverflow *[]*bmap // 保存了 bmap.extra 上的 oldoverflow 用来保证溢出桶不被回收。
startBucket uintptr // 桶数组迭代的随机偏移量,即第一个开遍历的桶。
offset uint8 // 桶内迭代的随机偏移量。
wrapped bool // bucket 数组的迭代是否已经过了最末端的桶(桶内遍历过程是个环)。
B uint8 // 初始化迭代器结构时的 hmap.B 的值。
i uint8 // 记录当前已经迭代的元素个数。
bucket uintptr // 当前要迭代的元素的桶索引。
checkBucket uintptr // 标识当前正在遍历的桶,是否有被扩容的风险。
}
// mapiterinit 函数主要是初始化 hiter 结构,一个 hiter 结构可以进行一次迭代。
func mapiterinit(t *maptype, h *hmap, it *hiter) {
// ..省略一些跟踪开关和检查..
it.t = t // map 类型元数据。
it.h = h // map 指针。
// 开始迭代前给当前的桶状态做个快照。
it.B = h.B
it.buckets = h.buckets
if t.bucket.ptrdata == 0 {
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}
// 下面代码计算从哪个位置开始迭代。
r := uintptr(fastrand()) // 来个随机数。
if h.B > 31-bucketCntBits {
// bucketCntBits 是常量 3,所以 31-bucketCntBits 就是固定的 28,
// 这里为 B 大于 28 的额外加一些高位的随机量,是为了保证能命中大索引的桶,
// 因为 fastrand 只有 int32。
r += uintptr(fastrand()) << 31
}
it.startBucket = r & bucketMask(h.B) // 用和 key 选桶相同的算法选择开始迭代的桶。
it.offset = uint8(r >> h.B & (bucketCnt - 1)) // 桶内的起始位置也根据 r 确定,这是真的很随机。
it.bucket = it.startBucket // bucket 记录的是当前迭代元素,现在当然就是起始元素了。
// 同个 map 的多个迭代器是可以并发迭代的,如果在扩容中新旧桶都会被用到,
// 因此用原子操作同时设上 iterator 和 oldIterator。
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
// 迭代起来~
mapiternext(it)
}
到这里迭代器就初始化完毕了,初始化的关键逻辑是随机数 r
,相信不少同学有观察到一个现象:即使是一个相同的 map 进行 2 次迭代的顺序也是不一样的,这个随机性就由上面代码中的 r
保证,每次迭代都从一个随机的元素开始。
2.6.3 迭代过程
const (
noCheck = 1<<(8*goarch.PtrSize) - 1 // 这个常量作为一个特殊的状态,标识正在迭代的桶是否有被迁移的风险。
)
func mapiternext(it *hiter) {
// ..省略一些跟踪开关和检查..
h := it.h
if h.flags&hashWriting != 0 {
throw("concurrent map iteration and map write")
}
t := it.t
checkBucket := it.checkBucket // 标识当前正在遍历的桶,是否有被扩容的风险。
bucket := it.bucket // 当前要迭代的桶,第一次调用时是 startBucket,此时 wrapped 是 false。
b := it.bptr // 当前要迭代的桶,的指针,第一次调用时是 nil。
i := it.i // 当前已经输出几个元素了,第一次调用时是 0。
// 这里到函数结尾,都在一个大循环里。
next:
if b == nil {
// 完整的循环一圈了,结束~
if bucket == it.startBucket && it.wrapped {
it.key = nil
it.elem = nil
return
}
// 正在扩容,需要对桶的迁移状态进行判断。
if h.growing() && it.B == h.B {
// 这个状态表示迭代开始的时候正在进行扩容,且当前的桶数量是扩容后的样子,这可能
// 是迁移后开始迭代的,或者是迭代后开启了等量扩容,且当前扩容还没扩容结束。
// 这样如果当前遍历的桶还没有被迁移,就需要回旧桶迭代,这里需要按迁移后的样子迭代,
// 只返回迁移后会被放到本桶的元素,因为可能下一次 mapiternext 时候,迁移就完成了,
// 不这么操作会造成重复迭代。
oldbucket := bucket & it.h.oldbucketmask() // 拿旧桶的掩码,算出在旧桶的位置。
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) // 旧桶的地址。
if !evacuated(b) { // 判断是否迁移结束,如果没有,迭代时候需要迭代回旧桶,记录桶地址。
checkBucket = bucket
} else {
// 本桶已经迁移结束了,拿新桶走遍历逻辑即可。
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
// 没扩容,或在扩容前就启动了迭代,且当前已经扩容迁移结束,走正常逻辑即可。
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
bucket++
// bucketShift(it.B) 就是桶数组长度,相等说明已经走到底要回头了。
// 想一下,最开始是随机位置遍历的~
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0
}
// 遍历 bucket 内的槽。
for ; i < bucketCnt; i++ {
offi := (i + it.offset) & (bucketCnt - 1) // 加上偏移量取模,算出偏移后的当前索引。
// isEmpty 说明当前是空的,而 evacuatedEmpty 当前桶被迁移了,
// 且迁移之前这个位置就是空的,无需去新的 bucket 里找了。
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
continue
}
// 获取 key。
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 获取 value。
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))
// 满足该条件说明当前正在增量扩容,而我们正在一个还没被迁移
// 却随时可能被迁移的桶上遍历。此时迭代操作需要迭代回旧桶,
// 同时忽略迁移后不会分配在本桶的元素。
if checkBucket != noCheck && !h.sameSizeGrow() {
// 这里的算法和扩容时候的高低桶选择算法是一致的,可以对比参照。
if t.reflexivekey() || t.key.equal(k, k) {
// hash 稳定的类型,使用 hash 值的高位做桶选择,不在本桶的直接跳过。
hash := t.hasher(k, uintptr(h.hash0))
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {
// 注意这里 b 是老桶的地址,遇到 hash 不稳定的类型,
// 使用原 tophash 的最低和本 bucket 索引的最高位(扩容释放出来的位)判断是否在本桶,
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}
}
// 该获取 key 和 value 了。
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey() || t.key.equal(k, k)) {
// 满足这个条件说明最新数据还在原桶中;
// 或者是 hash 不稳定的数据,这类不可能被删除或更新,可以直接用原桶的数据返回。
it.key = k
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
// 到这个 else 是最坏的情况了,说明数据已经被迁移走,不在本桶里了。
// 这里直接通过一个名为 mapaccessK 的函数去找 key 对应的数据,
// 该函数逻辑基本上是个精简版的 mapaccess1 访问逻辑,看过访问部分源码后再来
// 读应该很轻松,因此不再介绍了。
rk, re := mapaccessK(t, h, k)
if rk == nil {
continue // mapaccessK 也没找到的话,说明 key 被删了,找下一个吧。
}
it.key = rk
it.elem = re
}
it.bucket = bucket
if it.bptr != b { // 这个判断很有意思,是为了避免在相等的时候赋值带来不必要的写屏障。
it.bptr = b
}
it.i = i + 1
it.checkBucket = checkBucket // 现在是 checkBucket 那么下次迭代不换桶的话必然也是,因此保存之。
dedereturn
}
b = b.overflow(t) // 沿着溢出桶链表继续遍历。
i = 0
goto next
}
2.7 删除
最后来看看元素的删除逻辑,删除代码编译可能会变成 mapdelete
或 mapclear
函数的调用。
其中 mapclear
仅在写了迭代删除所有 key/value 的代码时被编译器优化后使用,逻辑较为简单,核心是利用初始化时提到的 makeBucketArray
的 dirtyalloc
逻辑重制了 bucket 内存块,下面主要看一下 mapdelete
的逻辑。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
// ..省略一些跟踪开关和检查..
if h == nil || h.count == 0 {
// 这段代码访问时候见过了。
if t.hashMightPanic() {
t.hasher(key, 0)
}
return
}
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0))
h.flags ^= hashWriting // 设置上写状态。
bucket := hash & bucketMask(h.B) // 和查找一样的逻辑,取与法获取对应的桶。
if h.growing() {
growWork(t, h, bucket) // 既然都读到桶了,迁移进度必须走一波~
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) // 目标 bucket 的地址。
bOrig := b // 记录下桶的内存位置,因为后面 b 会用于计算。
top := tophash(hash) // 获取 hash 的高 8 位。
search:
// 外层循环遍历整个 bucket 链。
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
// emptyRest 是槽位的初始状态,表示这之后的槽位都还没被占用,
// 到这里肯定没数据了,直接 break 大循环。
if b.tophash[i] == emptyRest {
break search
}
continue // hash 不匹配的话,后面还有元素就继续找。
}
// 走到这里说明 tophash 是匹配的,取出 key 来再比对看看。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.key.equal(key, k2) {
continue
}
// 走到这里就是找到了对应的 key,开始清理操作。
// 清理 key,注意这里和下面的 value 不太一样,仅对有指针的 key 进行了清理,
// 推测应该是因为 tophash 已经说明了槽位状态,只要释放了指针,
// key 内存有没有清零没有什么影响。
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
memclrHasPointers(k, t.key.size)
}
// 清理 value。
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
// 这里和 key 的情况不一样,即便是没有指针的 value 也会清理,
// 应该是因为访问时候会将这块地址返回出去。
memclrNoHeapPointers(e, t.elem.size)
}
b.tophash[i] = emptyOne // 将当前槽位标记为碎片空槽位。
// 后面的逻辑如果发现 bucket 目前的尾部都是 emptyOne,那么将他们改为 emptyRest 状态,
// 以便查找逻辑等地方不需反复遍历这些位置。
// 这里原文有段注释:这个逻辑放到独立的函数中会比较 nice,但是由于 for 循环目前编译
// 时不能自动内联优化,所以写一起了。
// 下面这 2 个 goto,都是后面有 emptyRest 的情况,说明后面还有数据。
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
// 上面没退出循环的话,说明后面一位是 emptyRest,而当前目前是 emptyOne,
// 其实也可以更新成 emptyRest 了,下面的循环就是在向前更新维护 emptyRest 状态。
for {
b.tophash[i] = emptyRest
if i == 0 { // 桶内回退到 0 可以向前换桶了。
if b == bOrig { // 表明已经回退到常规桶,可以结束了。
break
}
// 获取当前的上一个桶,因为是单向链表,所以需要下面那个循环,每次都要从头部找回来 = =!
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else { // 同一个桶内,前进到上一个槽。
i--
}
if b.tophash[i] != emptyOne { // 有数据说明肯定不是 emptyRest,可以结束遍历了。
break
}
}
notLast:
h.count--
if h.count == 0 { // 每次 map 没数据后,都会重新生成 hash seed。
h.hash0 = fastrand()
}
break search
}
}
// 并发控制,和 mapassign 基本一致,不再赘述。
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
}
在前面看过访问、赋值、扩容、迭代逻辑后再来看删除,应该是很容易的,阅读时候主要需要注意下 emptyRest
的设置逻辑即可。
到这里 map 的完整逻辑就全部看完了,能看到这里的同学相信已经对 map 的实现了然于胸,如果还有疑问可以给我留言相互探讨哈,谢谢~
三、参考资料
- runtime: hash function should be randomized to prevent DOS attacks · Issue #2630 · golang/go (github.com)
- map - 常见数据结构 - GO专家编程
- Go 语言 map 的底层实现完整剖析
- 理解 Golang 哈希表 Map 的原理
- Comparison operators
我是 bunnier,一个爱好马拉松的老码农,本文来自我的公众号「兔子哥有话说」,欢迎关注!如需转载请注明来源及出处。
转载自:https://juejin.cn/post/7079964047893856293