likes
comments
collection
share

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

作者站长头像
站长
· 阅读数 16

跳表(SkipList,全称跳跃表)是用于有序元素序列快速搜索查找的一个数据结构,跳表是一个随机化的数据结构,实质就是一种可以进行二分查找的有序链表。跳表在原有的有序链表上面增加了多级索引,通过索引来实现快速查找。跳表不仅能提高搜索性能,同时也可以提高插入和删除操作的性能。它在性能上和红黑树,AVL树不相上下,但是跳表的原理非常简单,实现也比红黑树简单很多。

介绍

跳跃表(简称跳表)由美国计算机科学家William Pugh发明于1989年。他在论文《Skip lists: a probabilistic alternative to balanced trees》中详细介绍了跳表的数据结构和插入删除等操作。

图形化探究跳表,先回顾链表。链表的优势就是更高效的插入、删除。痛点就是查询很慢很慢!每次查询都是一种O(n)复杂度的操作。

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析 能不能稍微优化一下,让它稍微跳一跳呢?答案是可以的,我们知道很多算法和数据结构以空间换时间,我们在上面加一层索引,让部分节点在上层能够直接定位到,这样链表的查询时间近乎减少一半。

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

在查询某个节点的时候,首先会从上一层快速定位节点所在的一个范围,如果找到具体范围向下然后查找代价很小,当然在表的结构设计上会增加一个向下的索引(指针)用来查找确定底层节点。平均查找速度平均为O(n/2)。但是当节点数量很大的时候,它依旧很慢很慢。我们都知道二分查找是每次都能折半的去压缩查找范围,要是有序链表也能这么跳起来那就太完美了。没错跳表就能让链表拥有近乎的接近二分查找的效率的一种数据结构,其原理依然是给上面加若干层索引,优化查找速度。 算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析 对于理想的跳表,每向上一层索引节点数量都是下一层的1/2,那么就有二分查找的性能。实际情况时不太可能有这种理想的情况。

特点和优势

  1. 快速查找:跳表支持以对数时间复杂度(O(log n))进行查找操作。相比于普通链表的线性查找(O(n)),跳表通过多层索引实现了快速的二分查找。
  2. 高效插入和删除:跳表在插入和删除操作上也能够达到对数时间复杂度。通过灵活地调整索引层数和节点的连接关系,跳表能够保持平衡并保证有序性。
  3. 简单而高效:相比于平衡二叉树等复杂的数据结构,跳表的实现相对简单,并且在大多数情况下能够提供类似的性能。
  4. 无需自平衡:与自平衡的数据结构(如红黑树)相比,跳表的插入和删除操作无需进行复杂的平衡调整,使得实现和维护更加简单。

跳表的核心思想是通过构建多层索引,每一层索引都是下一层索引的一部分,使得从顶层索引开始,可以快速定位到目标元素的前驱节点。通过这种索引结构,跳表可以在插入、删除和搜索操作中快速地定位到目标位置,从而提高了效率。

实现原理介绍

先基于JDK中java.util.concurrent包中ConcurrentSkipListMap源码,来介绍基于跳表(Skip List)数据结构相关时间

构造函数

  • 默认构造函数
public ConcurrentSkipListMap() {
    this.comparator = null;
    initialize();
}

//初始化,设置head头节点,lever层级为1,后面会讲到HeadIndex
private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                              null, null, 1);
}
  • 待指定比较器
public ConcurrentSkipListMap(Comparator<? super K> comparator) {  
this.comparator = comparator;  
initialize();  
}
  • 带MAP入参
public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
    this.comparator = null;
    initialize();
    putAll(m);
}

//AbstractMap中方法,put后面会讲到
public void putAll(Map<? extends K, ? extends V> m) {
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        put(e.getKey(), e.getValue());
}
  • 带有序Map入参
public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
    //比较器直接使用有序Map的
    this.comparator = m.comparator();
    initialize();
    buildFromSorted(m);
}

内部类

  • 数据节点:单链表
static final class Node<K,V> {
    final K key;
    // 在删除元素的时候value会指向当前元素本身
    volatile Object value;
    volatile Node<K,V> next;

    /**
     * Creates a new regular node.
     */
    Node(K key, Object value, Node<K,V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }

    /**
     * Creates a new marker node. A marker is distinguished by
     * having its value field point to itself.  Marker nodes also
     * have null keys, a fact that is exploited in a few places,
     * but this doesn't distinguish markers from the base-level
     * header node (head.node), which also has a null key.
     */
    Node(Node<K,V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }
}
  • 索引节点:索引节点,存储着对应的node值,及向下和向右的索引指针
static class Index<K,V> {
    final Node<K,V> node;
    final Index<K,V> down;
    volatile Index<K,V> right;

    /**
     * Creates index node with given values.
     */
    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }
    ...
}
  • 头节点:继承自Index,并扩展一个level字段,用于记录索引的层级
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

添加元素

源码分析

put方法入口

public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    //第一步:找到目标节点的位置并插入
    //目标节点是数据节点,也就是最底层的那条链条
    //进入自旋状态
    outer: for (;;) {
        // 寻找目标节点之前最近的一个索引对应的数据节点,存储在b中,b=before 
        // 并把b的下一个数据节点存储在n中,n=next 
        // 为了便于描述,我这里把b叫做当前节点,n叫做下一个节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            if (n != null) {
                Object v; int c;
                //f为n的下一个数据节点,也就是b的下一个节点的下一个节点(孙子节点)
                Node<K,V> f = n.next;
                // 如果n不为b的下一个节点 
                // 说明有其它线程修改了数据,则跳出内层循环 
                // 也就是回到了外层循环自旋的位置,从头来过
                if (n != b.next)               // inconsistent read
                    break;
                // 如果n的value值为空,说明该节点已删除,协助删除节点
                if ((v = n.value) == null) {   // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                // 如果b的值为空或者v等于n,说明b已被删除
                // 这时候n就是marker节点,那b就是被删除的那个
                if (b.value == null || v == n) // b is deleted
                    break;
                // 如果目标key与下一个节点的key大 
                // 说明目标元素所在的位置还在下一个节点的后面
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }

            // 有两种情况会到这里 
            // 一是到链表尾部了,也就是n为null了 
            // 二是找到了目标节点的位置,也就是上面的c<0 
            // 新建目标节点,并赋值给z 
            // 这里把n作为新节点的next 
            // 如果到链表尾部了,n为null,这毫无疑问 
            // 如果c<0,则n的key比目标key大,相妆于在b和n之间插入目标节点z z = new Node<K,V>(key, value, n); 
            // 原子更新b的下一个节点为目标节点z if (!b.casNext(n, z)) 
            // 如果更新失败,说明其它线程先一步修改了值,从头来过 break; 
            // 如果更新成功,跳出自旋状态
            z = new Node<K,V>(key, value, n);
            if (!b.casNext(n, z))
                break;         // restart if lost race to append to b
            break outer;
        }
    }

    //第二步:随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引
    int rnd = ThreadLocalRandom.nextSecondarySeed();
    // 0x80000001展开为二进制为10000000000000000000000000000001 
    // 只有两头是1 
    // 这里(rnd & 0x80000001) == 0 
    // 相当于排除了负数(负数最高位是1),排除了奇数(奇数最低位是1) 
    // 只有最高位最低位都不为1的数跟0x80000001做&操作才会为0 
    // 也就是正偶数
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
        int level = 1, max;
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        if (level <= (max = h.level)) {
            // 从第一层开始建立一条竖直的索引链表 
            // 这条链表使用down指针连接起来 
            // 每个索引节点里面都存储着目标节点这个数据节点 
            // 最后idx存储的是这条索引链表的最高层节点
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        else { // try to grow by one level
            level = max + 1; // hold in array and later pick the one to use
            // idxs用于存储目标节点建立的竖起索引的所有索引节点 
            // 其实这里直接使用idx这个最高节点也是可以完成的 
            // 只是用一个数组存储所有节点要方便一些 
            // 注意,这里数组0号位是没有使用的
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            // 自旋
            for (;;) {
                h = head;
                int oldLevel = h.level;
                if (level <= oldLevel) // lost race to add level
                    break;
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // find insertion points and splice in
        // 第三步:将新建的索引节点(包含头索引节点)与其它索引节点通过右指针连接在一起
        splice: for (int insertionLevel = level;;) {
            int j = h.level;
            // 从头索引节点开始遍历 
            // 为了方便,这里叫q为当前节点,r为右节点,d为下节点,t为目标节点相应层级的索引
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
                    if (!q.link(r, t))
                        break; // restart
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

// 寻找目标节点之前最近的一个索引对应的数据节点
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    // key不能为空
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    // 自旋
    for (;;) {
        // 从最高层头索引节点开始查找,先向右,再向下
        // 直到找到目标位置之前的那个索引
        for (Index<K,V> q = head, r = q.right, d;;) {
            // 如果右节点不为空
            if (r != null) {
                // 右节点对应的数据节点,为了方便,我们叫右节点的值
                Node<K,V> n = r.node;
                K k = n.key;
                // 如果右节点的value为空
                // 说明其它线程把这个节点标记为删除了
                // 则协助删除
                if (n.value == null) {
                    if (!q.unlink(r))
                        // 如果删除失败
                        // 说明其它线程先删除了,从头来过
                        break;           // restart
                    // 删除之后重新读取右节点
                    r = q.right;         // reread r
                    continue;
                }
                // 如果目标key比右节点还大,继续向右寻找
                if (cpr(cmp, key, k) > 0) {
                    // 往右移
                    q = r;
                    // 重新取右节点
                    r = r.right;
                    continue;
                }
                // 如果c<0,说明不能再往右了
            }
            // 到这里说明当前层级已经到最右了
            // 两种情况:一是r==null,二是c<0
            // 再从下一级开始找

            // 如果没有下一级了,就返回这个索引对应的数据节点
            if ((d = q.down) == null)
                return q.node;

            // 往下移
            q = d;
            // 重新取右节点
            r = d.right;
        }
    }
}

// Node.class中的方法,协助删除元素
void helpDelete(Node<K,V> b, Node<K,V> f) {
    /*
     * Rechecking links and then doing only one of the
     * help-out stages per call tends to minimize CAS
     * interference among helping threads.
     */
    // 这里的调用者this==n,三者关系是b->n->f
    if (f == next && this == b.next) {
        // 将n的值设置为null后,会先把n的下个节点设置为marker节点
        // 这个marker节点的值是它自己
        // 这里如果不是它自己说明marker失败了,重新marker
        if (f == null || f.value != f) // not already marked
            casNext(f, new Node<K,V>(f));
        else
            // marker过了,就把b的下个节点指向marker的下个节点
            b.casNext(this, f.next);
    }
}

// Index.class中的方法,删除succ节点
final boolean unlink(Index<K,V> succ) {
    // 原子更新当前节点指向下一个节点的下一个节点
    // 也就是删除下一个节点
    return node.value != null && casRight(succ, succ.right);
}

// Index.class中的方法,在当前节点与succ之间插入newSucc节点
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
    // 在当前节点与下一个节点中间插入一个节点
    Node<K,V> n = node;
    // 新节点指向当前节点的下一个节点
    newSucc.right = succ;
    // 原子更新当前节点的下一个节点指向新节点
    return n.value != null && casRight(succ, newSucc);
}

添加元素举例

假设初始链表是这样:

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

假如,我们现在要插入一个元素9。

(1)寻找目标节点之前最近的一个索引对应的数据节点,在这里也就是找到了5这个数据节点;

(2)从5开始向后遍历,找到目标节点的位置,也就是在8和12之间;

(3)插入9这个元素,Part I 结束;

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

然后,计算其索引层级,假如是3,也就是level=3。

(1)建立竖直的down索引链表;

(2)超过了现有高度2,还要再增加head索引链的高度;

(3)至此,Part II 结束;

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

最后,把right指针补齐。

(1)从第3层的head往右找当前层级目标索引的位置;

(2)找到就把目标索引和它前面索引的right指针连上,这里前一个正好是head;

(3)然后前一个索引向下移,这里就是head下移;

(4)再往右找目标索引的位置;

(5)找到了就把right指针连上,这里前一个是3的索引;

(6)然后3的索引下移;

(7)再往右找目标索引的位置;

(8)找到了就把right指针连上,这里前一个是5的索引;

(9)然后5下移,到底了,Part III 结束,整个插入过程结束;

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

删除元素

源码分析

public V remove(Object key) {
    return doRemove(key, null);
}

final V doRemove(Object key, Object value) {
    // key不为空
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    // 自旋
    outer: for (;;) {
        // 寻找目标节点之前的最近的索引节点对应的数据节点
        // 为了方便,这里叫b为当前节点,n为下一个节点,f为下下个节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            // 整个链表都遍历完了也没找到目标节点,退出外层循环
            if (n == null)
                break outer;
            // 下下个节点
            Node<K,V> f = n.next;
            // 再次检查
            // 如果n不是b的下一个节点了
            // 说明有其它线程先一步修改了,从头来过
            if (n != b.next)                    // inconsistent read
                break;
            // 如果下个节点的值奕为null了
            // 说明有其它线程标记该元素为删除状态了
            if ((v = n.value) == null) {        // n is deleted
                // 协助删除
                n.helpDelete(b, f);
                break;
            }
            // 如果b的值为空或者v等于n,说明b已被删除
            // 这时候n就是marker节点,那b就是被删除的那个
            if (b.value == null || v == n)      // b is deleted
                break;
            // 如果c<0,说明没找到元素,退出外层循环
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;
            // 如果c>0,说明还没找到,继续向右找
            if (c > 0) {
                // 当前节点往后移
                b = n;
                // 下一个节点往后移
                n = f;
                continue;
            }
            // c=0,说明n就是要找的元素
            // 如果value不为空且不等于找到元素的value,不需要删除,退出外层循环
            if (value != null && !value.equals(v))
                break outer;
            // 如果value为空,或者相等
            // 原子标记n的value值为空
            if (!n.casValue(v, null))
                // 如果删除失败,说明其它线程先一步修改了,从头来过
                break;

            // P.S.到了这里n的值肯定是设置成null了

            // 关键!!!!
            // 让n的下一个节点指向一个market节点
            // 这个market节点的key为null,value为marker自己,next为n的下个节点f
            // 或者让b的下一个节点指向下下个节点
            // 注意:这里是或者||,因为两个CAS不能保证都成功,只能一个一个去尝试
            // 这里有两层意思:
            // 一是如果标记market成功,再尝试将b的下个节点指向下下个节点,如果第二步失败了,进入条件,如果成功了就不用进入条件了
            // 二是如果标记market失败了,直接进入条件
            if (!n.appendMarker(f) || !b.casNext(n, f))
                // 通过findNode()重试删除(里面有个helpDelete()方法)
                findNode(key);                  // retry via findNode
            else {
                // 上面两步操作都成功了,才会进入这里,不太好理解,上面两个条件都有非"!"操作
                // 说明节点已经删除了,通过findPredecessor()方法删除索引节点
                // findPredecessor()里面有unlink()操作
                findPredecessor(key, cmp);      // clean index
                // 如果最高层头索引节点没有右节点,则跳表的高度降级
                if (head.right == null)
                    tryReduceLevel();
            }
            // 返回删除的元素值
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    return null;
}

删除元素举例

假如初始跳表如下图所示,我们要删除9这个元素。

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

(1)找到9这个数据节点;

(2)把9这个节点的value值设置为null;

(3)在9后面添加一个marker节点,标记9已经删除了;

(4)让8指向12;

(5)把索引节点与它前一个索引的right断开联系;

(6)跳表高度降级;

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

至于,为什么要有(2)(3)(4)这么多步骤呢,因为多线程下如果直接让8指向12,可以其它线程先一步在9和12间插入了一个元素10呢,这时候就不对了。

所以这里搞了三步来保证多线程下操作的正确性。

如果第(2)步失败了,则直接重试;

如果第(3)或(4)步失败了,因为第(2)步是成功的,则通过helpDelete()不断重试去删除;

其实helpDelete()里面也是不断地重试(3)和(4);

只有这三步都正确完成了,才能说明这个元素彻底被删除了。

这一块结合上面图中的红绿蓝色好好理解一下,一定要想在并发环境中会怎么样。

查找元素

源码分析

public V get(Object key) {
    return doGet(key);
}

private V doGet(Object key) {
    // key不为空
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    // 自旋
    outer: for (;;) {
        // 寻找目标节点之前最近的索引对应的数据节点
        // 为了方便,这里叫b为当前节点,n为下个节点,f为下下个节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            // 如果链表到头还没找到元素,则跳出外层循环
            if (n == null)
                break outer;
            // 下下个节点
            Node<K,V> f = n.next;
            // 如果不一致读,从头来过
            if (n != b.next)                // inconsistent read
                break;
            // 如果n的值为空,说明节点已被其它线程标记为删除
            if ((v = n.value) == null) {    // n is deleted
                // 协助删除,再重试
                n.helpDelete(b, f);
                break;
            }
            // 如果b的值为空或者v等于n,说明b已被删除
            // 这时候n就是marker节点,那b就是被删除的那个
            if (b.value == null || v == n)  // b is deleted
                break;
            // 如果c==0,说明找到了元素,就返回元素值
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            // 如果c<0,说明没找到元素
            if (c < 0)
                break outer;
            // 如果c>0,说明还没找到,继续寻找
            // 当前节点往后移
            b = n;
            // 下一个节点往后移
            n = f;
        }
    }
    return null;
}

查找元素举例

假如有如下图所示这个跳表,我们要查找9这个元素,它走过的路径是怎样的呢?可能跟你相像的不一样。。

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

(1)寻找目标节点之前最近的一个索引对应的数据节点,这里就是5;

(2)从5开始往后遍历,经过8,到9;

(3)找到了返回;

整个路径如下图所示:

算法技巧-跳表(Skip List)(一):介绍、ConcurrentSkipListMap源码分析

是不是很操蛋?

为啥不从9的索引直接过来呢?

从我实际打断点调试来看确实是按照上图的路径来走的。

我猜测可能是因为findPredecessor()这个方法是插入、删除、查找元素多个方法共用的,在单链表中插入和删除元素是需要记录前一个元素的,而查找并不需要,这里为了兼容三者使得编码相对简单一点,所以就使用了同样的逻辑,而没有单独对查找元素进行优化。

参考 1.高频面试数据结构——跳表 2.死磕 java集合之ConcurrentSkipListMap源码分析——发现个bug