Redis 源码分析(六)skiplist

skiplist 数据结构

skiplist的数据定义在server.h文件约997行,这个结构比较特别,很奇怪为什么不像其他数据结构一样封装在对应的文件中?

skiplist是zset的实现方式,从名字可以看出其实现的是跳跃表数据结构。跳表的相关数据结构定义如下:

 1/* ZSETs use a specialized version of Skiplists */
 2typedef struct zskiplistNode {
 3    /// 节点名
 4    sds ele;
 5    /// zset中的分
 6    double score;
 7    /// 前序指针
 8    struct zskiplistNode *backward;
 9    /// 后序指针列表,此处是一个数组最大深度默认为32
10    struct zskiplistLevel {
11        struct zskiplistNode *forward;
12        unsigned long span;
13    } level[];
14} zskiplistNode;
15
16typedef struct zskiplist {
17    /// 跳表的首尾节点
18    struct zskiplistNode *header, *tail;
19    /// 跳表的长度
20    unsigned long length;
21    /// 当前最大的level
22    int level;
23} zskiplist;
24
25/// zset 对应的数据结构
26typedef struct zset {
27    dict *dict;
28    zskiplist *zsl;
29} zset;

skiplist 操作函数

下面我们来介绍skiplist的操作函数,具体的代码在t_zset.c中

skiplist的创建与释放

 1/* Create a skiplist node with the specified number of levels.
 2 * The SDS string 'ele' is referenced by the node after the call. */
 3zskiplistNode *zslCreateNode(int level, double score, sds ele) {
 4    /// 根据level 分配 zskiplistNode
 5    zskiplistNode *zn =
 6        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
 7    zn->score = score;
 8    zn->ele = ele;
 9    return zn;
10}
11
12/* Create a new skiplist. */
13zskiplist *zslCreate(void) {
14    int j;
15    zskiplist *zsl;
16
17    zsl = zmalloc(sizeof(*zsl));
18    zsl->level = 1;
19    /// 创建的新zskiplist长度为0
20    zsl->length = 0;
21    /// 创建一个头节点,且level为最大,这个节点不存任何数据。
22    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
23    /// 对头节点的level字段进行初始化
24    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
25        zsl->header->level[j].forward = NULL;
26        zsl->header->level[j].span = 0;
27    }
28    zsl->header->backward = NULL;
29    zsl->tail = NULL;
30    return zsl;
31}
32
33/* Free the specified skiplist node. The referenced SDS string representation
34 * of the element is freed too, unless node->ele is set to NULL before calling
35 * this function. */
36void zslFreeNode(zskiplistNode *node) {
37    sdsfree(node->ele);
38    zfree(node);
39}
40
41/* Free a whole skiplist. */
42void zslFree(zskiplist *zsl) {
43    zskiplistNode *node = zsl->header->level[0].forward, *next;
44
45    zfree(zsl->header);
46    while(node) {
47        next = node->level[0].forward;
48        zslFreeNode(node);
49        node = next;
50    }
51    zfree(zsl);
52}

插入新数据

skiplist 使用了多级跳表的形式来组织数据,当有新数据被插入的时候redis先根据score来查找插入的位置,找到之后会通过随机的方式决定新节点的level值。

 1/* Returns a random level for the new skiplist node we are going to create.
 2 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 3 * (both inclusive), with a powerlaw-alike distribution where higher
 4 * levels are less likely to be returned. */
 5int zslRandomLevel(void) {
 6    int level = 1;
 7    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
 8        level += 1;
 9    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
10}

由代码可以看到,level从1开始,以每1/4的概率进行累加,且最高不超过ZSKIPLIST_MAXLEVEL。我们可以计算一下每级的生成概率:

等级 概率 等级 概率
1 100.000% 9 0.390%
2 50.000% 10 0.195%
3 25.000% 11 0.097%
4 12.500% 12 0.048%
5 6.250% 13 0.024%
6 3.120% 14 0.012%
7 1.560% 15 0.006%
8 0.781% 16 0.003%

到level 达到16的时候生成的概率已经降到了十万分只一,也就是说至少是十万数量级的数据集才会有高度为16的跳表节点。

了解了跳表节点的生成我们再来看插入数据的时候redis是如何做的。

 1/* Insert a new node in the skiplist. Assumes the element does not already
 2 * exist (up to the caller to enforce that). The skiplist takes ownership
 3 * of the passed SDS string 'ele'. */
 4zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
 5    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 6    unsigned int rank[ZSKIPLIST_MAXLEVEL];
 7    int i, level;
 8
 9    /// 单精度和双精度数据会有一个特殊值NaN,当浮点数除数被除数都为0时产生。另一个特殊值是INF,表示无穷大。
10    serverAssert(!isnan(score));
11    x = zsl->header;
12    /// 从最高的节点开始,依次向低层级节点查找数据的位置。可以看下代码后面的示例图,红色是要插入的目标,红色箭头是查找的路径。
13    for (i = zsl->level-1; i >= 0; i--) {
14        /* store rank that is crossed to reach the insert position */
15        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
16        /// 判断后续指针,并比较后续节点的score和元素名称。这里的字符串比较是区分大小写的。
17        while (x->level[i].forward &&
18                (x->level[i].forward->score < score ||
19                    (x->level[i].forward->score == score &&
20                    sdscmp(x->level[i].forward->ele,ele) < 0)))
21        {
22            /// 同时计算跳过的元素个数(计算元素下标),span则是相同高度的节点间包含元素的个数。
23            rank[i] += x->level[i].span;
24            x = x->level[i].forward;
25        }
26        update[i] = x;
27    }
28    /// 此时update数组中保存了待插入节点的各级别链表的前序节点。
29    /// rank数组中则保存了前序节点到更高一级前序节点跳过的元素个数。
30    
31    /* we assume the element is not already inside, since we allow duplicated
32     * scores, reinserting the same element should never happen since the
33     * caller of zslInsert() should test in the hash table if the element is
34     * already inside or not. */
35    /// 此处认为所有重复的节点已经被zset的前置检查限制了,此处不会出现相同名字的对象
36    /// 随机一个节点高度(等级)
37    level = zslRandomLevel();
38    /// 初始化高度节点
39    if (level > zsl->level) {
40        /// 此次随机出的高度超出了以往,则更新最大高度,并将rank和update数组中高出的部分都初始化。
41        for (i = zsl->level; i < level; i++) {
42            rank[i] = 0;
43            update[i] = zsl->header;
44            update[i]->level[i].span = zsl->length;
45        }
46        zsl->level = level;
47    }
48
49    /// 根据高度和分值生成节点
50    x = zslCreateNode(level,score,ele);
51    for (i = 0; i < level; i++) {
52        /// 新节点的后续节点设置为前序节点的后续节点
53        x->level[i].forward = update[i]->level[i].forward;
54        /// 此处更新所有前序节点的后续指针为新元素地址
55        update[i]->level[i].forward = x;
56
57        /* update span covered by update[i] as x is inserted here */
58        /// 更新新节点的各level的span
59        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
60        /// 更新新节点高度及以下的所有前序节点的span
61        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
62    }
63
64    /// 比新节点高度更高的节点的span进行+1处理
65    /* increment span for untouched levels */
66    for (i = level; i < zsl->level; i++) {
67        update[i]->level[i].span++;
68    }
69
70    /// 插入的是头节点,则设置前序节点为空
71    x->backward = (update[0] == zsl->header) ? NULL : update[0];
72    /// 如果新节点的前序节点为空则新节点作为位节点
73    if (x->level[0].forward)
74        x->level[0].forward->backward = x;
75    else
76        zsl->tail = x;
77    /// 元素个数 +1
78    zsl->length++;
79    return x;
80}

skiplist的精华在于其span的计算,这样的结构能在准确定位score的基础上快速计算出对应元素的下标。并且因为整个列表有序,所以可以很容易的取出一个范围内的所有元素。

删除节点

知道如何插入新节点再看节点删除就会容易的多,redis删除skiplist节点有两个函数需要我们去关注一下。

 1/* Internal function used by zslDelete, zslDeleteRangeByScore and
 2 * zslDeleteRangeByRank. */
 3void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
 4    int i;
 5    /// 根据update数组中保存的所有前序节点,更新其后续节点和span
 6    for (i = 0; i < zsl->level; i++) {
 7        if (update[i]->level[i].forward == x) {
 8            update[i]->level[i].span += x->level[i].span - 1;
 9            update[i]->level[i].forward = x->level[i].forward;
10        } else {
11            update[i]->level[i].span -= 1;
12        }
13    }
14    /// 对节点是首尾元素的情况进行修正
15    if (x->level[0].forward) {
16        x->level[0].forward->backward = x->backward;
17    } else {
18        zsl->tail = x->backward;
19    }
20
21    /// 根据情况降低跳表高度
22    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
23        zsl->level--;
24    zsl->length--;
25}
26
27/* Delete an element with matching score/element from the skiplist.
28 * The function returns 1 if the node was found and deleted, otherwise
29 * 0 is returned.
30 *
31 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
32 * it is not freed (but just unlinked) and *node is set to the node pointer,
33 * so that it is possible for the caller to reuse the node (including the
34 * referenced SDS string at node->ele). */
35int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
36    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
37    int i;
38
39    x = zsl->header;
40    /// 查找目标元素并保存目标元素前所有高度的前序节点,有点像下楼梯!
41    for (i = zsl->level-1; i >= 0; i--) {
42        while (x->level[i].forward &&
43                (x->level[i].forward->score < score ||
44                    (x->level[i].forward->score == score &&
45                     sdscmp(x->level[i].forward->ele,ele) < 0)))
46        {
47            x = x->level[i].forward;
48        }
49        update[i] = x;
50    }
51
52    /* We may have multiple elements with the same score, what we need
53     * is to find the element with both the right score and object. */
54    x = x->level[0].forward;
55    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
56        /// 分值和名字都匹配上了则调用节点删除函数
57        zslDeleteNode(zsl, x, update);
58        /// 如果node为空则释放该节点
59        if (!node)
60            zslFreeNode(x);
61        else
62            *node = x;
63        return 1;
64    }
65    return 0; /* not found */
66}

删除skiplist节点没有什么特别的,除了更新span都是常规的链表操作。

skiplist更新score

当score变动的时候redis又是如何做的呢?如果之前的插入删除已经搞明白了,更新的逻辑应该没什么难度。redis在这里有一个优化点,在仅积分有变化而无需变化位置的时候节点是不做任何操作的。当节点需要改变顺序的时候则先删除找到的节点,再插入一个新节点并释放老节点。

 1/* Update the score of an element inside the sorted set skiplist.
 2 * Note that the element must exist and must match 'score'.
 3 * This function does not update the score in the hash table side, the
 4 * caller should take care of it.
 5 *
 6 * Note that this function attempts to just update the node, in case after
 7 * the score update, the node would be exactly at the same position.
 8 * Otherwise the skiplist is modified by removing and re-adding a new
 9 * element, which is more costly.
10 *
11 * The function returns the updated element skiplist node pointer. */
12zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
13    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
14    int i;
15
16    /* We need to seek to element to update to start: this is useful anyway,
17     * we'll have to update or remove it. */
18    x = zsl->header;
19    /// 查找目标节点
20    for (i = zsl->level-1; i >= 0; i--) {
21        while (x->level[i].forward &&
22                (x->level[i].forward->score < curscore ||
23                    (x->level[i].forward->score == curscore &&
24                     sdscmp(x->level[i].forward->ele,ele) < 0)))
25        {
26            x = x->level[i].forward;
27        }
28        update[i] = x;
29    }
30
31    /* Jump to our element: note that this function assumes that the
32     * element with the matching score exists. */
33    x = x->level[0].forward;
34    serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
35
36    /* If the node, after the score update, would be still exactly
37     * at the same position, we can just update the score without
38     * actually removing and re-inserting the element in the skiplist. */
39    /// 这里就是上面讲的,是否需要改变位置的判定
40    if ((x->backward == NULL || x->backward->score < newscore) &&
41        (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
42    {
43        x->score = newscore;
44        return x;
45    }
46
47    /* No way to reuse the old node: we need to remove and insert a new
48     * one at a different place. */
49    /// 先删除找到的节点
50    zslDeleteNode(zsl, x, update);
51    /// 插入一个新节点。这里不去复用老节点是因为节点高度无法复用,所以直接插入一个新节点。
52    zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
53    /* We reused the old node x->ele SDS string, free the node now
54     * since zslInsert created a new one. */
55    x->ele = NULL;
56    /// 释放老节点的内存。
57    zslFreeNode(x);
58    return newnode;
59}

当在zset中更新score的时候,redis是有大概率需要重新分配节点内存的。但redis本身使用了内存池,所以内存分配的开销是比较低的。

其他skiplist关于range的操作基本上没跑出以上介绍的增删改查的范围,有兴趣的可以直接看代码。对于已经理解以上代码的人看起来应该没有什么难度。

评论