Golang 5分钟读懂 Map
哈希表是计算机科学中的最重要数据结构之一,这不仅因为它 𝑂(1) 的读写性能非常优秀,还因为它提供了键值之间的映射。想要实现一个性能优异的哈希表,需要注意两个关键点 —— 哈希函数和冲突解决 方法。
内存模型
在源码中,表示 map
的结构体是 hmap,它是 hashmap 的缩写:
type hmap struct {
//元素个数,调用 len(map) 时,直接返回此值
count int
flags uint8
//buckets 的对数 log_2
B uint8
//overflow 的 bucket 近似数
noverflow uint16
//计算 key 的哈希的时候会传入哈希函数
hash0 uint32
// 指向 buckets 数组,大小为 2^B
// 如果元素个数为0,就为 nil
buckets unsafe.Pointer
// 扩容的时候,buckets 长度会是 oldbuckets 的两倍
oldbuckets unsafe.Pointer
//指示扩容进度,小于此地址的 buckets 迁移完成
nevacuate uintptr
extra *mapextra
}
说明一下,B
是 buckets
数组的长度的对数,也就是说 buckets
数组的长度就是 2^B。bucket
里面存储了 key 和 value,后面会再讲。
buckets
是一个指针,最终它指向的是一个结构体数组,元素类型 bmap
:
type bmap struct{
tophash [bucketCnt]uint8
}
但这只是表面的结构,编译期间 会给它加料,动态地创建一个新的结构:
type bmap struct{
topbits [8]uint8
keys [8]keytype
elems [8]elemtype
overflow uintptr
}
bmap
就是我们常说的“桶”,桶里面会最多装 8 个 key,这些 key 之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果是“一类”的。在桶内,又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个位置(一个桶内最多有8个位置)
当 map 的 key 和 value 都不是指针,并且 size 都小于 128 字节的情况下,会把 bmap 标记为不含指针,这样可以避免 gc 时扫描整个 hmap。但是,我们看 bmap 其实有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。
type mapextra struct{
// overflow[0] contains overflow buckets for hmap.buckets.
// overflow[1] contains overflow buckets for hmap.oldbuckets.
overflow [2]*[]*bmap
// nextOverflow 包含空闲的 overflow bucket,这是预分配的 bucket
nextOverflow *bmap
}
bmap 是存放 k-v 的地方,我们把视角拉近,仔细看 bmap 的内部组成。
上图就是 bucket 的内存模型,HOB Hash
指的就是 top hash。 注意到 key 和 value 是各自放在一起的,并不是 key/value/key/value/...
这样的形式。源码里说明这样的好处是在某些情况下可以省略掉 padding 字段,节省内存空间。
哈希函数
在程序启动时,会检测 cpu 是否支持 aes,如果支持,则使用 aes hash,否则使用 memhash。这是在函数 alginit()
中完成,位于路径:src/runtime/alg.go
下。
key 定位过程
key 经过哈希计算后得到哈希值,共 64 个 bit 位(64位机,32位机就不讨论了,现在主流都是64位机),计算它到底要落在哪个桶时,只会用到最后 B 个 bit 位。还记得前面提到过的 B 吗?如果 B = 5,那么桶的数量,也就是 buckets 数组的长度是 2^5 = 32。
例如,现在有一个 key 经过哈希函数计算后,得到的哈希结果是:
10010111|000011110110110010001111001010100010010110010101010│01010
用最后的 5 个 bit 位,也就是 01010
,值为 10,也就是 10 号桶。这个操作实际上就是取余操作,但是取余开销太大,所以代码实现上用的位操作代替。
再用哈希值的高 8 位,找到此 key 在 bucket 中的位置,这是在寻找已有的 key。最开始桶内还没有 key,新加入的 key 会找到第一个空位,放入。
buckets 编号就是桶编号,当两个不同的 key 落在同一个桶中,也就是发生了哈希冲突。冲突的解决手段是用链表法:在 bucket 中,从前往后找到第一个空位。这样,在查找某个 key 时,先找到对应的桶,再去遍历 bucket 中的 key。
上图中,假定 B = 5,所以 bucket 总数就是 2^5 = 32。首先计算出待查找 key 的哈希,使用低 5 位 00110
,找到对应的 6 号 bucket,使用高 8 位 10010111
,对应十进制 151,在 6 号 bucket 中寻找 tophash 值(HOB hash)为 151 的 key,找到了 2 号槽位,这样整个查找过程就结束了。
如果在 bucket 中没找到,并且 overflow 不为空,还要继续去 overflow bucket 中寻找,直到找到或是所有的 key 槽位都找遍了,包括所有的 overflow bucket。
扩容过程
使用哈希表的目的就是要快速查找到目标 key,然而,随着向 map 中添加的 key 越来越多,key 发生碰撞的概率也越来越大。bucket 中的 8 个 cell 会被逐渐塞满,查找、插入、删除 key 的效率也会越来越低。最理想的情况是一个 bucket 只装一个 key,这样,就能达到 O(1)
的效率,但这样空间消耗太大,用空间换时间的代价太高。
Go 语言采用一个 bucket 里装载 8 个 key,定位到某个 bucket 后,还需要再定位到具体的 key,这实际上又用了时间换空间。
当然,这样做,要有一个度,不然所有的 key 都落在了同一个 bucket 里,直接退化成了链表,各种操作的效率直接降为 O(n),是不行的。
因此,需要有一个指标来衡量前面描述的情况,这就是装载因子
。Go 源码里这样定义 装载因子
:
loadFactor := count / (2^B)
count 就是 map 的元素个数,2^B 表示 bucket 数量。
再来说触发 map 扩容的时机:在向 map 插入新 key 的时候,会进行条件检测,符合下面这 2 个条件,就会触发扩容:
- 装载因子超过阈值,源码里定义的阈值是 6.5。
- overflow 的 bucket 数量过多:当 B 小于 15,如果 overflow 的 bucket 数量超过 2^B;当 B >= 15,如果 overflow 的 bucket 数量超过 2^15。
通过汇编语言可以找到赋值操作对应源码中的函数是 mapassign ,对应扩容条件的源码如下:
if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
}
// 装载因子超过 6.5
func overLoadFactor(count int64, B uint8) bool {
return count >= bucketCnt && float32(count) >= loadFactor*float32((uint64(1)<<B))
}
// overflow buckets 太多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B < 16 {
return noverflow >= uint16(1)<<B
}
return noverflow >= 1<<15
}
面说的 hashGrow()
函数实际上并没有真正地“搬迁”,它只是分配好了新的 buckets,并将老的 buckets 挂到了 oldbuckets 字段上。真正搬迁 buckets 的动作在 growWork()
函数中,而调用 growWork()
函数的动作是在 mapassign 和 mapdelete 函数中。也就是插入或修改、删除 key 的时候,都会尝试进行搬迁 buckets 的工作。先检查 oldbuckets 是否搬迁完毕,具体来说就是检查 oldbuckets 是否为 nil。
我们先看 hashGrow()
函数所做的工作,再来看具体的搬迁 buckets 是如何进行的。
func hashGrow(t *maptype, h *hmap) {
// B+1 相当于是原来 2 倍的空间
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
//进行等量的内存扩容,所以 B 不变
bigger = 0
h.flags |= sameSizeGrow
}
// 将老 buckets 挂到 buckets 上
oldbuckets := h.buckets
// 申请新的 buckets 空间
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 提交 grow 的动作
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
// 搬迁进度为 0
h.nevacuate = 0
// overflow buckets 数为 0
h.noverflow = 0
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}
下面通过图来宏观地看一下扩容前后的变化。
扩容前,B = 2,共有 4 个 buckets,lowbits 表示 hash 值的低位。假设我们不关注其他 buckets 情况,专注在 2 号 bucket。并且假设 overflow 太多,触发了等量扩容(对应于前面的条件 2)。
扩容完成后,overflow bucket 消失了,key 都集中到了一个 bucket,更为紧凑了,提高了查找的效率。
假设触发了 2 倍的扩容,那么扩容完成后,老 buckets 中的 key 分裂到了 2 个 新的 bucket。一个在 x part,一个在 y 的 part。依据是 hash 的 lowbits。新 map 中 0-3
称为 x part,4-7
称为 y part。
遍历过程是怎样的
package main
import "fmt"
func main() {
ageMp := make(map[string]int)
ageMp["qcrao"] = 18
for name, age := range ageMp {
fmt.Println(name, age)
}
}
执行命令:
go tool compile -S main.go
关键的几行汇编代码如下:
0x00c7 00199 (main.go:8) CALL runtime.mapiterinit(SB)
...
0x0145 00325 (main.go:8) LEAQ ""..autotmp_9+128(SP), AX
0x014d 00333 (main.go:8) CALL runtime.mapiternext(SB)
...
这样,关于 map 迭代,底层的函数调用关系一目了然。先是调用 mapiterinit 函数初始化迭代器,然后循环调用 mapiternext 函数进行 map 迭代。
迭代器的结构体定义:
type hiter struct {
key unsafe.Pointer // Must be in first position. Write nil to indicate iteration end (see cmd/compile/internal/gc/range.go).
elem unsafe.Pointer // Must be in second position (see cmd/compile/internal/gc/range.go).
t *maptype
h *hmap
// 初始化时指向的 bucket
buckets unsafe.Pointer // bucket ptr at hash_iter initialization time
// 当前遍历到的 bmap
bptr *bmap // current bucket
overflow *[]*bmap // keeps overflow buckets of hmap.buckets alive
oldoverflow *[]*bmap // keeps overflow buckets of hmap.oldbuckets alive
startBucket uintptr // bucket iteration started at
offset uint8 // intra-bucket offset to start from during iteration (should be big enough to hold bucketCnt-1)
wrapped bool // already wrapped around from end of bucket array to beginning
B uint8
i uint8
bucket uintptr
checkBucket uintptr
}
mapiterinit
就是对 hiter 结构体里的字段进行初始化赋值操作。
前面已经提到过,即使是对一个写死的 map 进行遍历,每次出来的结果也是无序的。下面我们就可以近距离地观察他们的实现了。
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
// 从哪个 bucket 开始遍历
it.startBucket = r & bucketMask(h.B)
// 从 bucket 的哪个 cell 开始遍历
it.offset = uint8(r >> h.B & (bucketCnt - 1))
func bucketShift(b uint8) uintptr {
// Masking the shift amount allows overflow checks to be elided.
return uintptr(1) << (b & (sys.PtrSize*8 - 1))
}
func bucketMask(b uint8) uintptr {
return bucketShift(b) - 1
}
例如,B = 2,那 uintptr(1)<<h.B - 1
结果就是 3,低 8 位为 0000 0011
,将 r 与之相与,就可以得到一个 0~3
的 bucket 序号;bucketCnt - 1 等于 7,低 8 位为 0000 0111
,将 r 右移 2 位后,与 7 相与,就可以得到一个 0~7
号的 cell。
于是,在 mapiternext
函数中就会从 it.startBucket 的 it.offset 号的 cell 开始遍历,取出其中的 key 和 value,直到又回到起点 bucket,完成遍历过程。
源码部分比较好看懂,尤其是理解了前面注释的几段代码后,再看这部分代码就没什么压力了。所以,接下来,我将通过图形化的方式讲解整个遍历过程,希望能够清晰易懂。
假设我们有下图所示的一个 map,起始时 B = 1,有两个 bucket,后来触发了扩容(这里不要深究扩容条件,只是一个设定),B 变成 2。并且, 1 号 bucket 中的内容搬迁到了新的 bucket,1 号
裂变成 1 号
和 3 号
;0 号
bucket 暂未搬迁。老的 bucket 挂在在 *oldbuckets
指针上面,新的 bucket 则挂在 *buckets
指针上面。
这时,我们对此 map 进行遍历。假设经过初始化后,startBucket = 3,offset = 2。于是,遍历的起点将是 3 号 bucket 的 2 号 cell,下面这张图就是开始遍历时的状态:
标红的表示起始位置,bucket 遍历顺序为:3 -> 0 -> 1 -> 2。
因为 3 号 bucket 对应老的 1 号 bucket,因此先检查老 1 号 bucket 是否已经被搬迁过。判断方法就是:
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > empty && h < minTopHash
}
如果 b.tophash[0] 的值在标志值范围内,即在 (0,4) 区间里,说明已经被搬迁过了。
empty = 0
evacuatedEmpty = 1
evacuatedX = 2
evacuatedY = 3
minTopHash = 4
在本例中,老 1 号 bucket 已经被搬迁过了。所以它的 tophash[0] 值在 (0,4) 范围内,因此只用遍历新的 3 号 bucket。
依次遍历 3 号 bucket 的 cell,这时候会找到第一个非空的 key:元素 e。到这里,mapiternext 函数返回,这时我们的遍历结果仅有一个元素:
由于返回的 key 不为空,所以会继续调用 mapiternext 函数。
继续从上次遍历到的地方往后遍历,从新 3 号 overflow bucket 中找到了元素 f 和 元素 g。
遍历结果集也因此壮大:
新 3 号 bucket 遍历完之后,回到了新 0 号 bucket。0 号 bucket 对应老的 0 号 bucket,经检查,老 0 号 bucket 并未搬迁,因此对新 0 号 bucket 的遍历就改为遍历老 0 号 bucket。那是不是把老 0 号 bucket 中的所有 key 都取出来呢?
并没有这么简单,回忆一下,老 0 号 bucket 在搬迁后将裂变成 2 个 bucket:新 0 号、新 2 号。而我们此时正在遍历的只是新 0 号 bucket(注意,遍历都是遍历的 *bucket
指针,也就是所谓的新 buckets)。所以,我们只会取出老 0 号 bucket 中那些在裂变之后,分配到新 0 号 bucket 中的那些 key。
因此,lowbits == 00
的将进入遍历结果集:
和之前的流程一样,继续遍历新 1 号 bucket,发现老 1 号 bucket 已经搬迁,只用遍历新 1 号 bucket 中现有的元素就可以了。结果集变成:
继续遍历新 2 号 bucket,它来自老 0 号 bucket,因此需要在老 0 号 bucket 中那些会裂变到新 2 号 bucket 中的 key,也就是 lowbit == 10
的那些 key。
这样,遍历结果集变成:
最后,继续遍历到新 3 号 bucket 时,发现所有的 bucket 都已经遍历完毕,整个迭代过程执行完毕。
顺便说一下,如果碰到 key 是 math.NaN()
这种的,处理方式类似。核心还是要看它被分裂后具体落入哪个 bucket。只不过只用看它 top hash 的最低位。如果 top hash 的最低位是 0 ,分配到 X part;如果是 1 ,则分配到 Y part。据此决定是否取出 key,放到遍历结果集里。
map 遍历的核心在于理解 2 倍扩容时,老 bucket 会分裂到 2 个新 bucket 中去。而遍历操作,会按照新 bucket 的序号顺序进行,碰到老 bucket 未搬迁的情况时,要在老 bucket 中找到将来要搬迁到新 bucket 来的 key。
Go1.18内存分配升级
通过克隆一个runtime.hiter
结构体,来优化旧的Go版本编译器在创建map
迭代器时,需要再一次进行内存分配。
如果在项目中使用了json序列化库 github.com/json-iterat… ,如果Go语言版本大于等于1.18,且json-iterator/go
的版本小于v1.1.12。在序列化结构体时会因为内存分配方式不一致导致发生panic
。你需要使用最新版的json-iterator/go
转载自:https://juejin.cn/post/7056228667466203173