原创分享 抽丝剥茧—Go 哈希 Map 的鬼魅神功

weishixianglian · 2020年06月23日 · 最后由 weishixianglian 回复于 2020年06月23日 · 511 次阅读

抽丝剥茧—Go 哈希 Map 的鬼魅神功

  • Go 语言中的哈希 Map 是江湖上极厉害的一门武功,其入门简单,即便是掌握到了 2、3 层也具有四两拨千斤的神奇功效.因此成为江湖人士竞相研习的技艺,风头一时无两.
  • 但即便是成名已久的高手,也鲜有能修炼到最高层的.
  • 本文不仅介绍了哈希 Map 基本的使用方式,还深入源码介绍了哈希 Map 的至高心法.希望本文有助于你对 Go 哈希 Map 的理解臻于化境. ## 哈希表
  • Go 语言中的 Map,又称为 Hash map(哈希表) 是使用频率极高的一种数据结构,重要程度高到令人发指。
  • 哈希表的原理是将多个 key/value 对分散存储在 buckets(桶) 中。给定一个 key,哈希算法会计算出键值对存储的位置。时常会通过两步完成,伪代码如图所示: hash = hashfunc(key) index = hash % array_size
  • 在此伪代码中,第一步计算通过 hash 算法计算 key 的 hash 值,其结果与桶的数量无关。
  • 接着通过执行取模运算得到0 - array_size−1 之间的 index 序号。
  • 在实践中,我们时常将 Map 看做 o(1) 时间复杂度的操作,通过一个键 key 快速寻找其唯一对应的 value。 ## Map 基本操作 ### Map 的声明与初始化 首先,来看一看 map 的基本使用方式。map 声明的第一种方式如下 var hash map[T]T 其并未对 map 进行初始化的操作,其值为 nil,因此一旦进行hash[key]=alue这样的赋值操作就会报错。 panic(plainError("assignment to entry in nil map")) 比较意外的是 Go 语言允许对为 nil 的 map 进行访问:hash["Go"],虽然其结果显然毫无意义.

map 的第二种声明方式通过 make 进行。make 的第二个参数中代表初始化创建 map 的长度。当 NUMBER 为空时,代表默认长度为 0.

var hash = make(map[T]T,NUMBER)

此种方式可以正常的对 map 进行访问与赋值 在 map 初始化时,还具有字面量形式初始化的方式。其在创建 map 时即在其中添加了元素。

var country = map[string]string{
    "China":  "Beijing",
    "Japan":  "Tokyo",
    "India":  "New Delhi",
    "France": "Paris",
    "Italy":  "Rome",
}

rating := map[string]float64{"c": 5, "Go": 4.5, "Python": 4.5, "C++": 3}

Map 的访问

map 可以进行两种形式的访问:

v  := hash[key]

以及

v,ok := map[key]

当返回 2 个参数时,第 2 个参数代表当前 key 在 map 中是否存在。 不用惊讶于为什么同样的访问可以即返回一个值又返回两个值,这是在编译时做到的,后面会介绍。

Map 的赋值

map 的赋值语法相对简单

hash[key] = value

其代表将 value 与给 map1 哈希表中的 key 绑定在一起

Map 的删除

map 的删除需要用到 delete,其是 Go 语言中的关键字,用于进行 map 的删除操作,形如:

delete(hash,key)

可以对相同的 key 进行多次的删除操作,而不会报错

关于 map 中的 key

很容易理解,如果 map 中的 key 都没有办法比较是否相同,那么就不能作为 map 的 key。 关于 Go 语言中的可比较性,直接阅读官方文档即可:https://golang.org/ref/spec#Comparison_operators 下面简单列出一些类型的可比较性

布尔值是可比较的
整数值可比较的
浮点值是可比较的
复数值是可比较的
字符串值是可比较
指针值是可比较的。如果两个指针值指向相同的变量,或者两个指针的值均为nil,则它们相等。
通道值是可比较的。如果两个通道值是由相同的make调用创建的,或者两个值都为nil。
接口值是可比较的。如果两个接口值具有相同的动态类型和相等的动态值,或者两个接口值都为nil,则它们相等。
如果结构的所有字段都是可比较的,则它们的值是可比较的。
如果数组元素类型的值可比较,则数组值可比较。如果两个数组的对应元素相等,则它们相等。
切片、函数、map是不可比较的。

关于 map 并发冲突问题

  • 和其他语言有些不同的是,map 并不支持并发的读写,因此下面的操作是错误的 aa := make(map[int]int) go func() { for{ aa[0] = 5 } }() go func() { for{ _ = aa[1] } }() 报错: fatal error: concurrent map read and map write
  • Go 语言只支持并发的读取 Map.因此下面的函数是没有问题的 aa := make(map[int]int) go func() { for{ _ = aa[2] } }() go func() { for{ _ = aa[1] } }()

Go 语言为什么不支持并发的读写,是一个频繁被提起的问题。我们可以在 Go 官方文档Frequently Asked Questions找到问题的答案(https://golang.org/doc/faq#atomic_maps


Map被设计为不需要从多个goroutine安全访问,在实际情况下,Map可能是某些已经同步的较大数据结构或计算的一部分。
因此,要求所有Map操作都互斥将减慢大多数程序的速度,而只会增加少数程序的安全性。

即这样做的目的是为了大多数情况下的效率。

Map 在运行时

  • 介绍了 Map 的基本操作,本节介绍一下 Map 在运行时的行为,以便能够深入 Map 内部的实现机制。
  • 明白了 Map 的实现机制,有助于更加灵活的使用 Map 和进行深层次的调优过程。由于代码里面的逻辑关系关联比较复杂。本节会首先用多张图片帮助读者有一个抽象的理解。 Go 语言 Map 的底层实现如下所示:
// A header for a Go map.
type hmap struct {
    count     int // # live cells == size of map.  Must be first (used by len() builtin)
    flags     uint8
    B         uint8  // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
    noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
    hash0     uint32 // hash seed

    buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
    oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
    nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)

    extra *mapextra // optional fields
}

其中:

  • count 代表 Map 中元素的数量.
  • flags 代表当前 Map 的状态(是否处于正在写入的状态等).
  • B 对数形式表示的当前 Map 中桶的数量, 2^B = Buckets size.
  • noverflow 为 Map 中溢出桶的数量.当溢出桶太多的时候,Map 会进行same-size map growth.其实质是为了避免溢出桶过大导致的内存泄露问题.
  • hash0 代表生成 hash 的随机数种子.
  • buckets 指向了当前 Map 对应的桶的指针.
  • oldbuckets 是在 Map 进行扩容的时候存储旧桶的.当所有的旧桶中的数据都已经转移到了新桶,则清空。
  • nevacuate 在扩容的时候使用。用于标记当前旧桶中小于 nevacuate 的桶都已经转移到了新桶.
  • extra 存储 Map 中的溢出桶

代表桶的bmap结构在运行时只列出了其首个字段: 即一个固定长度为 8 的数组。此字段顺序存储 key 的哈希值的前 8 位.

type bmap struct {
    tophash [bucketCnt]uint8
}

可能会有疑问,桶中存储的 key 和 value 值哪里去了? 这是因为 Map 在编译时即确定了 map 中 key,value,桶的大小。因此在运行时仅仅通过指针操作即可找到特定位置的元素。 桶本身在存储的 tophash 字段之后,会存储 key 数组以及 value 数组

type bmap struct {
    tophash [bucketCnt]uint8
    key   [bucketCnt]T
    value [bucketCnt]T
    ....
}

Go 语言选择将 key 与 value 分开存储而不是 key/value/key/value 的形式,是为了在字节对齐的时候能够压缩空间。

在进行hash[key]此类的的 Map 访问操作时,会首先找到桶的位置,如下为伪代码操作.

hash = hashfunc(key)
index = hash % array_size

接着遍历 tophash 数组,如果数组中找到了相同的 hash,那么就可以接着通过指针的寻址操作找到 key 与 value 值

= value赋值操作时,当指定桶中的数据超过了8个,并不会直接就新开辟一个新桶,而是会将数据放置到溢出桶中每个桶的最后还存储了overflow` 即溢出桶的指针

  • 在正常情况下,数据是很少会跑到溢出桶里面去的。同理,我们也可以知道,在 Map 的查找操作时,如果 key 的 hash 在指定桶的 tophash 数组中不存在,还会遍历溢出桶中的数据。

  • 后面我们会看到,如果一开始初始化 map 的数量比较大。则 map 提前创建好一些溢出桶存储在extra *mapextra 字段.
type mapextra struct {
    overflow    *[]*bmap
    oldoverflow *[]*bmap
    nextOverflow *bmap
}

这样当出现溢出现象是,就可以用提前创建好的桶而不用申请额外的内存空间。当预分配的溢出桶使用完了,溢出桶才会新建。

当发生以下两种情况之一,map 会进行重建:

  • 当 Map 超过了负载因子大小
  • 当溢出桶的数量过多

在哈希表中都有负载因子的概念

负载因子 = 哈希表中元素数量 / 桶的数量
  • 因此随着负载因子的增大,意味着越多的元素会分配到同一个桶中。此时其效率会减慢。
  • 试想如果桶的数量只有 1 个,此时负载因子到达最大,此时的搜索效率就成了遍历数组。在 Go 语言中的负载因子为 6.5。
  • 当超过了其大小后,Mpa 会进行扩容,增大两倍于旧表的大小。
  • 旧桶的数据会首先存到oldbuckets 字段,并想办法分散的转移到新桶中。

map 的重建还存在第二种情况,即溢出桶的数量太多。这时只会新建和原来的 map 具有相同大小的桶。进行这样same size的重建为了是防止溢出桶的数量可能缓慢增长导致的内存泄露.

  • 当进行 map 的 delete 操作时, 和赋值操作类似,会找到指定的桶,如果存在指定的 key,那么就释放掉 key 与 value 引用的内存。同时 tophash 中指定位置会存储emptyOne,代表当前位置是空的。

  • 同时在删除操作时,会探测到是否当前要删除的元素之后都是空的。如果是,tophash 会存储为emptyRest. 这样做的好处是在做查找操作时,遇到 emptyRest 可以直接退出,因为后面的元素都是空的。

Map 深入

上一节用多张图片解释了 Map 的实现原理,本节会继续深入 Go 语言源码解释 Map 的具体实现细节。问题掌握得有多细致,理解得就有多透彻。

Map 深入: make 初始化

如果我们使用 make 关键字初始化 Map,在 typecheck1 类型检查阶段,节点 Node 的 op 操作变为OMAKEMAP,如果指定了 make map 的长度,则会将长度常量值类型转换为 TINT 类型.如果未指定长度,则长度为 0。nodintconst(0)

func typecheck1(n *Node, top int) (res *Node) {
        ...
        case TMAP:
            if i < len(args) {
                l = args[i]
                i++
                l = typecheck(l, ctxExpr)
                l = defaultlit(l, types.Types[TINT])
                if l.Type == nil {
                    n.Type = nil
                    return n
                }
                if !checkmake(t, "size", l) {
                    n.Type = nil
                    return n
                }
                n.Left = l
            } else {
                n.Left = nodintconst(0)
            }
            n.Op = OMAKEMAP
  • 如果 make 的第二个参数不是整数,则会在类型检查时报错。
if !checkmake(t, "size", l) {
        n.Type = nil
        return n
}

func checkmake(t *types.Type, arg string, n *Node) bool {
    if !n.Type.IsInteger() && n.Type.Etype != TIDEAL {
        yyerror("non-integer %s argument in make(%v) - %v", arg, t, n.Type)
        return false
    }
}

  • 最后会指定在运行时调用 runtime.makemap*函数
func walkexpr(n *Node, init *Nodes) *Node {
           fnname := "makemap64"
            argtype := types.Types[TINT64]
            // Type checking guarantees that TIDEAL hint is positive and fits in an int.
            // See checkmake call in TMAP case of OMAKE case in OpSwitch in typecheck1 function.
            // The case of hint overflow when converting TUINT or TUINTPTR to TINT
            // will be handled by the negative range checks in makemap during runtime.
            if hint.Type.IsKind(TIDEAL) || maxintval[hint.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
                fnname = "makemap"
                argtype = types.Types[TINT]
            }
            fn := syslook(fnname)
            fn = substArgTypes(fn, hmapType, t.Key(), t.Elem())
            n = mkcall1(fn, n.Type, init, typename(n.Type), conv(hint, argtype), h)
}

不管是 makemap64 还是 makemap,最后都调用了 makemap 函数

func makemap64(t *maptype, hint int64, h *hmap) *hmap {
    if int64(int(hint)) != hint {
        hint = 0
    }
    return makemap(t, int(hint), h)
}
  • 保证创建 map 的长度不能超过 int 大小

    if int64(int(hint)) != hint {
        hint = 0
    }
    
  • makemap 函数会计算出需要的桶的数量,即 log2(N),并调用makeBucketArray函数生成桶和溢出桶

  • 如果初始化时生成了溢出桶,会放置到 map 的extra字段里去

func makemap(t *maptype, hint int, h *hmap) *hmap {
    ...
    B := uint8(0)
    for overLoadFactor(hint, B) {
        B++
    }
    h.B = B

    if h.B != 0 {
        var nextOverflow *bmap
        h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
        if nextOverflow != nil {
            h.extra = new(mapextra)
            h.extra.nextOverflow = nextOverflow
        }
    }

    return h
}

  • makeBucketArray 会为 Map 申请内存大小,这里需要注意的是,如果 map 的数量大于了2^4,则会在初始化的时候生成溢出桶。溢出桶的大小为 2^(b-4),b 为桶的大小。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
    if b >= 4 {
        nbuckets += bucketShift(b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }

    if dirtyalloc == nil {
        buckets = newarray(t.bucket, int(nbuckets))
    } else {
        buckets = dirtyalloc
        size := t.bucket.size * nbuckets
        if t.bucket.ptrdata != 0 {
            memclrHasPointers(buckets, size)
        } else {
            memclrNoHeapPointers(buckets, size)
        }
    }

    if base != nbuckets {
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}

Map 深入: 字面量初始化

如果是采取了字面量初始化的方式,其最终任然是需要转换为make操作,其长度是字面量的长度。其编译时的核心逻辑位于:

func anylit(n *Node, var_ *Node, init *Nodes){
    ...
case OMAPLIT:
        if !t.IsMap() {
            Fatalf("anylit: not map")
        }
            maplit(n, var_, init)

}

func maplit(n *Node, m *Node, init *Nodes) {
    a := nod(OMAKE, nil, nil)
    a.Esc = n.Esc
    a.List.Set2(typenod(n.Type), nodintconst(int64(n.List.Len())))
    if len(entries) > 25 {
        ...
    }
    ...
}
  • 唯一值得一提的是,如果字面量的个数大于 25 个,编译时会构建一个数组循环添加
entries := n.List.Slice()
if len(entries) > 25 {
    // loop adding structure elements to map
    // for i = 0; i < len(vstatk); i++ {
    //  map[vstatk[i]] = vstate[i]
    // }
}
  • 如果字面量的个数小于 25 个,编译时会指定会采取直接添加的方式赋值
for _, r := range entries {
    map[key] = value
}

Map 深入: map 访问

前面介绍过,对 map 的访问,具有两种形式。一种是返回单个值

v := hash[key]

一种是返回多个返回值

v, ok := hash[key]

Go 语言没有函数重载的概念,决定返回一个值还是两个值很明显只能够在编译时完成。 对于 v:= rating["Go"] rating["Go"] 会在编译时解析为一个 node,其中左边 type 为 ONAME,存储名字:,右边 type 为 OLITERAL,存储"Go",节点的 op 为"OINDEXMAP" 根据hash[key] 位于赋值号的左边或右边,决定要执行访问还是赋值的操作。访问操作会在运行时调用运行 mapaccess1_XXX 函数,赋值操作会在运行时调用 mapassign_XXX 函数.

if n.IndexMapLValue() {
            // This m[k] expression is on the left-hand side of an assignment.
            fast := mapfast(t)
            if fast == mapslow {
                // standard version takes key by reference.
                // orderexpr made sure key is addressable.
                key = nod(OADDR, key, nil)
            }
            n = mkcall1(mapfn(mapassign[fast], t), nil, init, typename(t), map_, key)
        } else {
            // m[k] is not the target of an assignment.
            fast := mapfast(t)
            if fast == mapslow {
                // standard version takes key by reference.
                // orderexpr made sure key is addressable.
                key = nod(OADDR, key, nil)
            }

            if w := t.Elem().Width; w <= 1024 { // 1024 must match runtime/map.go:maxZero
                n = mkcall1(mapfn(mapaccess1[fast], t), types.NewPtr(t.Elem()), init, typename(t), map_, key)
            } else {
                z := zeroaddr(w)
                n = mkcall1(mapfn("mapaccess1_fat", t), types.NewPtr(t.Elem()), init, typename(t), map_, key, z)
            }
        }
  • Go 编译器根据 map 中的 key 类型和大小选择不同的 mapaccess1_XXX 函数进行加速,但是他们在查找逻辑上都是相同的
func mapfast(t *types.Type) int {
    // Check runtime/map.go:maxElemSize before changing.
    if t.Elem().Width > 128 {
        return mapslow
    }
    switch algtype(t.Key()) {
    case AMEM32:
        if !t.Key().HasHeapPointer() {
            return mapfast32
        }
        if Widthptr == 4 {
            return mapfast32ptr
        }
        Fatalf("small pointer %v", t.Key())
    case AMEM64:
        if !t.Key().HasHeapPointer() {
            return mapfast64
        }
        if Widthptr == 8 {
            return mapfast64ptr
        }
        // Two-word object, at least one of which is a pointer.
        // Use the slow path.
    case ASTRING:
        return mapfaststr
    }
    return mapslow
}

func mkmapnames(base string, ptr string) mapnames {
    return mapnames{base, base + "_fast32", base + "_fast32" + ptr, base + "_fast64", base + "_fast64" + ptr, base + "_faststr"}
}

var mapaccess1 = mkmapnames("mapaccess1", "")

最终会在运行时会调用 mapaccess1_XXXX 的函数。

而对于v, ok := hash[key]类型的 map 访问则有所不同。在编译时的 op 操作为 OAS2MAPR.会将其转换为在运行时调用的 mapaccess2_XXXX 前缀的函数。其伪代码如下:

//   var,b = mapaccess2*(t, m, i)
//   v = *var
  • 需要注意,如果采用_, ok := hash[key]形式,则不用对第一个参数进行赋值操作.
  • 在运行时,会根据 key 值以及 hash 种子 计算 hash 值:alg.hash(key, uintptr(h.hash0)).
  • 接着 bucketMask 计算出当前桶的个数-1. m := bucketMask(h.B)
  • Go 语言采用了一种简单的方式hash&m计算出此 key 应该位于哪一个桶中.获取到桶的位置后,tophash(hash)即可计算出 hash 的前 8 位.
  • 接着此 hash 挨个与存储在桶中的 tophash 进行对比。如果有 hash 值相同的话.会找到其对应的 key 值,查看 key 值是否相同。如果 key 值也相同,即说明查找到了结果,返回 value 哦。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    alg := t.key.alg
    hash := alg.hash(key, uintptr(h.hash0))
    m := bucketMask(h.B)
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    top := tophash(hash)
bucketloop:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
            if alg.equal(key, k) {
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                if t.indirectelem() {
                    e = *((*unsafe.Pointer)(e))
                }
                return e
            }
        }
    }
    return unsafe.Pointer(&zeroVal[0])
}
  • 函数 mapaccess2 的逻辑几乎是类似的,只是其会返回第二个参数,表明 value 值是否存在于桶中.
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
    alg := t.key.alg
    hash := alg.hash(key, uintptr(h.hash0))
    m := bucketMask(h.B)
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + (hash&m)*uintptr(t.bucketsize)))
    top := tophash(hash)
bucketloop:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
            if alg.equal(key, k) {
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                if t.indirectelem() {
                    e = *((*unsafe.Pointer)(e))
                }
                return e, true
            }
        }
    }
    return unsafe.Pointer(&zeroVal[0]), false
}

Map 深入: 赋值操作

  • 和访问的情况的比较类似, 最终会调用运行时 mapassign*函数。
  • 赋值操作,map 必须已经进行了初始化。
if h == nil {
    panic(plainError("assignment to entry in nil map"))
}
  • 同时要注意,由于 Map 不支持并发的读写操作,因此还会检测是否有协程在访问此 Map,如果是,即会报错。
if h.flags&hashWriting != 0 {
    throw("concurrent map writes")
}
  • 和访问操作一样,会计算 key 的 hash 值
alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0))
  • 标记当前 map 已经是写入状态
h.flags ^= hashWriting
  • 如果当前没有桶,还会常见一个新桶。所以初始化的时候还是定一个长度吧。
if h.buckets == nil {
    h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
  • 接着找到当前 key 对应的桶
bucket := hash & bucketMask(h.B)
  • 如果发现,当前的 map 正好在重建,还没有重建完。会优先完成重建过程,重建的细节后面会介绍。
if h.growing() {
    growWork(t, h, bucket)
}
  • 计算 tophash
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
top := tophash(hash)
  • 开始寻找是否有对应的 hash,如果找到了,判断 key 是否相同,如果相同,会找到对应的 value 的位置在后面进行赋值
for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if isEmpty(b.tophash[i]) && inserti == nil {
                    inserti = &b.tophash[i]
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                }
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
            if !alg.equal(key, k) {
                continue
            }
            // already have a mapping for key. Update it.
            if t.needkeyupdate() {
                typedmemmove(t.key, k, key)
            }
            elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
            goto done
        }
        ovf := b.overflow(t)
        if ovf == nil {
            break
        }
        b = ovf
    }

  • 要注意的是,如果 tophash 没找到,还会去溢出桶里寻找是否存在指定的 hash
  • 如果也不存在,会选择往第一个空元素中插入数据 inserti、insertk 会记录此空元素的位置,
if isEmpty(b.tophash[i]) && inserti == nil {
    inserti = &b.tophash[i]
    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
    elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
  • 在赋值之前,还需要判断 Map 是否需要重建
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }
  • 没有问题后,就会执行最后的操作,将新的 key 与 value 值存入数组中
  • 这里需要注意一点是,如果桶中已经没有了空元素。这时候我们申请一个新的桶给到这个桶。
if inserti == nil {
    // all current buckets are full, allocate a new one.
    newb := h.newoverflow(t, b)
    inserti = &newb.tophash[0]
    insertk = add(unsafe.Pointer(newb), dataOffset)
    elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
  • 申请的新桶一开始是来自于 map 中extra字段初始化时存储的多余溢出桶。如果这些多余的溢出桶都用完了才会申请新的内存。一个桶的溢出桶可能会进行延展

 *hmap, key unsafe.Pointer) unsafe.Pointer {
    var inserti *uint8
    var insertk unsafe.Pointer
    var elem unsafe.Pointer
bucketloop:
    for {


    // Did not find mapping for key. Allocate new cell & add entry.

    // If we hit the max load factor or we have too many overflow buckets,
    // and we're not already in the middle of growing, start growing.


    if inserti == nil {
        // all current buckets are full, allocate a new one.
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        elem = add(insertk, bucketCnt*uintptr(t.keysize))
    }

    // store new key/elem at insert position
    if t.indirectkey() {
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectelem() {
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(elem) = vmem
    }
    typedmemmove(t.key, insertk, key)
    *inserti = top
    h.count++

done:
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
    if t.indirectelem() {
        elem = *((*unsafe.Pointer)(elem))
    }
    return elem
}

Map 深入: Map 重建

当发生以下两种情况之一,map 会进行重建:

  • 当 Map 超过了负载因子大小 6.5
  • 当溢出桶的数量过多 重建时需要调用 hashGrow 函数,如果是负载因子超载,会进行双倍重建。
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
    bigger = 0
    h.flags |= sameSizeGrow
}
  • 当溢出桶的数量过多,则会进行等量重建。新桶会会存储到buckets字段,旧桶会存储到oldbuckets字段。
  • map 中 extra 字段的溢出桶也同理的进行了转移。

if h.extra != nil && h.extra.overflow != nil {
    // Promote current overflow buckets to the old generation.
    if h.extra.oldoverflow != nil {
        throw("oldoverflow is not nil")
    }
    h.extra.oldoverflow = h.extra.overflow
    h.extra.overflow = nil
}


  • hashGrow 代码一览
func hashGrow(t *maptype, h *hmap) {
    // If we've hit the load factor, get bigger.
    // Otherwise, there are too many overflow buckets,
    // so keep the same number of buckets and "grow" laterally.
    bigger := uint8(1)
    if !overLoadFactor(h.count+1, h.B) {
        bigger = 0
        h.flags |= sameSizeGrow
    }
    oldbuckets := h.buckets
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        flags |= oldIterator
    }
    // commit the grow (atomic wrt gc)
    h.B += bigger
    h.flags = flags
    h.oldbuckets = oldbuckets
    h.buckets = newbuckets
    h.nevacuate = 0
    h.noverflow = 0

    if h.extra != nil && h.extra.overflow != nil {
        // Promote current overflow buckets to the old generation.
        if h.extra.oldoverflow != nil {
            throw("oldoverflow is not nil")
        }
        h.extra.oldoverflow = h.extra.overflow
        h.extra.overflow = nil
    }
    if nextOverflow != nil {
        if h.extra == nil {
            h.extra = new(mapextra)
        }
        h.extra.nextOverflow = nextOverflow
    }

    // the actual copying of the hash table data is done incrementally
    // by growWork() and evacuate().
}
  • 要注意的是, 在这里并没有进行实际的将旧桶数据转移到新桶的过程。数据转移遵循了copy on write(写时复制) 的规则。只有在真正赋值的时候,会选择是否需要进行数据转移。核心逻辑位于函数growWork and evacuate
bucket := hash & bucketMask(h.B)
if h.growing() {
    growWork(t, h, bucket)
}
  • 在进行写时复制的时候,意味着并不是所有的数据都会一次性的进行转移,而只会转移当前需要的这个旧桶。 bucket := hash & bucketMask(h.B)得到了当前新桶所在的位置,而要转移的旧桶的位置位于bucket&h.oldbucketmask() xy [2]evacDst 用于存储要转移到新桶的位置

如果是双倍重建,那么旧桶转移到新桶的位置总是相距旧桶的数量.

如果是等量重建,则简单的直接转移即可

到哪一些新桶.

  • 其中有一个非常重要的原则是:如果此数据计算完 hash 后,hash & bucketMask <= 旧桶的大小 意味着这个数据必须转移到和旧桶位置完全对应的新桶中去.理由是现在当前 key 所在新桶的序号与旧桶是完全相同的。
newbit := h.noldbuckets()
if hash&newbit != 0 {
    useY = 1
}
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
    ...
    // Unlink the overflow buckets & clear key/elem to help GC.
        if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
            b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
            // Preserve b.tophash because the evacuation
            // state is maintained there.
            ptr := add(b, dataOffset)
            n := uintptr(t.bucketsize) - dataOffset
            memclrHasPointers(ptr, n)
        }
    }

    if oldbucket == h.nevacuate {
        advanceEvacuationMark(h, t, newbit)
    }
}

Map 深入: delete

  • 删除的逻辑在之前介绍过,是比较简单的。
  • 核心逻辑位于func mapdelete(t *maptype, h *hmap, key unsafe.Pointer)
  • 同样需要计算出 hash 的前 8 位、指定的桶等。
  • 同样会一直寻找是否有相同的 key,如果找不到,会一直查找当前桶的溢出桶下去,知道到达末尾...
  • 如果查找到了指定的 key,则会清空数据,hash 位设置为emptyOne. 如果发现后面没有元素,则会设置为emptyRest,并循环向上检查前一个元素是否为空。
for {
    b.tophash[i] = emptyRest
    if i == 0 {
        if b == bOrig {
            break // beginning of initial bucket, we're done.
        }
        // Find previous bucket, continue at its last entry.
        c := b
        for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
        }
        i = bucketCnt - 1
    } else {
        i--
    }
    if b.tophash[i] != emptyOne {
        break
    }
}
  • delete 代码一览
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
    alg := t.key.alg
    hash := alg.hash(key, uintptr(h.hash0))
    h.flags ^= hashWriting

    bucket := hash & bucketMask(h.B)
    if h.growing() {
        growWork(t, h, bucket)
    }
    b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
    bOrig := b
    top := tophash(hash)
search:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break search
                }
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            k2 := k
            if t.indirectkey() {
                k2 = *((*unsafe.Pointer)(k2))
            }
            if !alg.equal(key, k2) {
                continue
            }
                if t.indirectkey() {
                *(*unsafe.Pointer)(k) = nil
            } else if t.key.ptrdata != 0 {
                memclrHasPointers(k, t.key.size)
            }
            e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
            if t.indirectelem() {
                *(*unsafe.Pointer)(e) = nil
            } else if t.elem.ptrdata != 0 {
                memclrHasPointers(e, t.elem.size)
            } else {
                memclrNoHeapPointers(e, t.elem.size)
            }
            b.tophash[i] = emptyOne
                if i == bucketCnt-1 {
                if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
                    goto notLast
                }
            } else {
                if b.tophash[i+1] != emptyRest {
                    goto notLast
                }
            }
            for {
                b.tophash[i] = emptyRest
                if i == 0 {
                    if b == bOrig {
                        break // beginning of initial bucket, we're done.
                    }
                    c := b
                    for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
                    }
                    i = bucketCnt - 1
                } else {
                    i--
                }
                if b.tophash[i] != emptyOne {
                    break
                }
            }
        notLast:
            h.count--
            break search
        }
    }
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
}

总结

  • 本文首先介绍了 Go 语言 Map 增删查改的基本用法
  • 接着介绍了 Map 使用中 key 的特性,只有可比较的类型才能够作为 Map 中的 key
  • 接着介绍了 map 禁止并发读写的设计原因,即为了大部分程序的效率而牺牲了小部分程序的安全。
  • 最后我们深入源码介绍了 map 编译和运行时的具体细节。Map 有多种初始化的方式,如果指定了长度 N,在初始化时会生成桶。桶的数量为 log2(N).如果 map 的长度大于了2^4,则会在初始化的时候生成溢出桶。溢出桶的大小为 2^(b-4),b 为桶的大小。
  • 在涉及到访问、赋值、删除操作时,都会首先计算数据的 hash 值,接着简单的&运算计算出数据存储在桶中的位置。接着会根据 hash 的前 8 位与存储在桶中的 hash、key 进行比较,完成最后的赋值与访问操作。如果数据放不下了还会申请放置到溢出桶中
  • 当 Map 超过了负载因子大小会进行双倍重建,溢出桶太大会进行等量重建。数据的转移采取了写时复制的策略,即在用到时才会将旧桶的数据打散放入到新桶中。
  • 因此,可以看出 Map 是简单高效的 kv 存储的利器,它非常快但是却快不到极致。理论上来说,我们总是可以根据我们数据的特点设计出更好的哈希函数以及映射机制。
  • Map 的重建的过程提示我们可以评估放入到 Map 的数据大小,并在初始化时指定
  • Map 在实践中极少成为性能的瓶颈,但是却容易写出并发冲突的程序。这提示我们进行合理的设计以及进行race检查。

  • see you~

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio

原创文章,欢迎支持! 关注公众号唯识相链了解更多

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册