skiplist 数据结构
skiplist的数据定义在server.h文件约997行,这个结构比较特别,很奇怪为什么不像其他数据结构一样封装在对应的文件中?
skiplist是zset的实现方式,从名字可以看出其实现的是跳跃表数据结构。跳表的相关数据结构定义如下:
1/* ZSETs use a specialized version of Skiplists */
2typedef struct zskiplistNode {
3 /// 节点名
4 sds ele;
5 /// zset中的分
6 double score;
7 /// 前序指针
8 struct zskiplistNode *backward;
9 /// 后序指针列表,此处是一个数组最大深度默认为32
10 struct zskiplistLevel {
11 struct zskiplistNode *forward;
12 unsigned long span;
13 } level[];
14} zskiplistNode;
15
16typedef struct zskiplist {
17 /// 跳表的首尾节点
18 struct zskiplistNode *header, *tail;
19 /// 跳表的长度
20 unsigned long length;
21 /// 当前最大的level
22 int level;
23} zskiplist;
24
25/// zset 对应的数据结构
26typedef struct zset {
27 dict *dict;
28 zskiplist *zsl;
29} zset;
skiplist 操作函数
下面我们来介绍skiplist的操作函数,具体的代码在t_zset.c中
skiplist的创建与释放
1/* Create a skiplist node with the specified number of levels.
2 * The SDS string 'ele' is referenced by the node after the call. */
3zskiplistNode *zslCreateNode(int level, double score, sds ele) {
4 /// 根据level 分配 zskiplistNode
5 zskiplistNode *zn =
6 zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
7 zn->score = score;
8 zn->ele = ele;
9 return zn;
10}
11
12/* Create a new skiplist. */
13zskiplist *zslCreate(void) {
14 int j;
15 zskiplist *zsl;
16
17 zsl = zmalloc(sizeof(*zsl));
18 zsl->level = 1;
19 /// 创建的新zskiplist长度为0
20 zsl->length = 0;
21 /// 创建一个头节点,且level为最大,这个节点不存任何数据。
22 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
23 /// 对头节点的level字段进行初始化
24 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
25 zsl->header->level[j].forward = NULL;
26 zsl->header->level[j].span = 0;
27 }
28 zsl->header->backward = NULL;
29 zsl->tail = NULL;
30 return zsl;
31}
32
33/* Free the specified skiplist node. The referenced SDS string representation
34 * of the element is freed too, unless node->ele is set to NULL before calling
35 * this function. */
36void zslFreeNode(zskiplistNode *node) {
37 sdsfree(node->ele);
38 zfree(node);
39}
40
41/* Free a whole skiplist. */
42void zslFree(zskiplist *zsl) {
43 zskiplistNode *node = zsl->header->level[0].forward, *next;
44
45 zfree(zsl->header);
46 while(node) {
47 next = node->level[0].forward;
48 zslFreeNode(node);
49 node = next;
50 }
51 zfree(zsl);
52}
插入新数据
skiplist 使用了多级跳表的形式来组织数据,当有新数据被插入的时候redis先根据score来查找插入的位置,找到之后会通过随机的方式决定新节点的level值。
1/* Returns a random level for the new skiplist node we are going to create.
2 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
3 * (both inclusive), with a powerlaw-alike distribution where higher
4 * levels are less likely to be returned. */
5int zslRandomLevel(void) {
6 int level = 1;
7 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
8 level += 1;
9 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
10}
由代码可以看到,level从1开始,以每1/4的概率进行累加,且最高不超过ZSKIPLIST_MAXLEVEL。我们可以计算一下每级的生成概率:
等级 | 概率 | 等级 | 概率 |
---|---|---|---|
1 | 100.000% | 9 | 0.390% |
2 | 50.000% | 10 | 0.195% |
3 | 25.000% | 11 | 0.097% |
4 | 12.500% | 12 | 0.048% |
5 | 6.250% | 13 | 0.024% |
6 | 3.120% | 14 | 0.012% |
7 | 1.560% | 15 | 0.006% |
8 | 0.781% | 16 | 0.003% |
到level 达到16的时候生成的概率已经降到了十万分只一,也就是说至少是十万数量级的数据集才会有高度为16的跳表节点。
了解了跳表节点的生成我们再来看插入数据的时候redis是如何做的。
1/* Insert a new node in the skiplist. Assumes the element does not already
2 * exist (up to the caller to enforce that). The skiplist takes ownership
3 * of the passed SDS string 'ele'. */
4zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
5 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
6 unsigned int rank[ZSKIPLIST_MAXLEVEL];
7 int i, level;
8
9 /// 单精度和双精度数据会有一个特殊值NaN,当浮点数除数被除数都为0时产生。另一个特殊值是INF,表示无穷大。
10 serverAssert(!isnan(score));
11 x = zsl->header;
12 /// 从最高的节点开始,依次向低层级节点查找数据的位置。可以看下代码后面的示例图,红色是要插入的目标,红色箭头是查找的路径。
13 for (i = zsl->level-1; i >= 0; i--) {
14 /* store rank that is crossed to reach the insert position */
15 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
16 /// 判断后续指针,并比较后续节点的score和元素名称。这里的字符串比较是区分大小写的。
17 while (x->level[i].forward &&
18 (x->level[i].forward->score < score ||
19 (x->level[i].forward->score == score &&
20 sdscmp(x->level[i].forward->ele,ele) < 0)))
21 {
22 /// 同时计算跳过的元素个数(计算元素下标),span则是相同高度的节点间包含元素的个数。
23 rank[i] += x->level[i].span;
24 x = x->level[i].forward;
25 }
26 update[i] = x;
27 }
28 /// 此时update数组中保存了待插入节点的各级别链表的前序节点。
29 /// rank数组中则保存了前序节点到更高一级前序节点跳过的元素个数。
30
31 /* we assume the element is not already inside, since we allow duplicated
32 * scores, reinserting the same element should never happen since the
33 * caller of zslInsert() should test in the hash table if the element is
34 * already inside or not. */
35 /// 此处认为所有重复的节点已经被zset的前置检查限制了,此处不会出现相同名字的对象
36 /// 随机一个节点高度(等级)
37 level = zslRandomLevel();
38 /// 初始化高度节点
39 if (level > zsl->level) {
40 /// 此次随机出的高度超出了以往,则更新最大高度,并将rank和update数组中高出的部分都初始化。
41 for (i = zsl->level; i < level; i++) {
42 rank[i] = 0;
43 update[i] = zsl->header;
44 update[i]->level[i].span = zsl->length;
45 }
46 zsl->level = level;
47 }
48
49 /// 根据高度和分值生成节点
50 x = zslCreateNode(level,score,ele);
51 for (i = 0; i < level; i++) {
52 /// 新节点的后续节点设置为前序节点的后续节点
53 x->level[i].forward = update[i]->level[i].forward;
54 /// 此处更新所有前序节点的后续指针为新元素地址
55 update[i]->level[i].forward = x;
56
57 /* update span covered by update[i] as x is inserted here */
58 /// 更新新节点的各level的span
59 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
60 /// 更新新节点高度及以下的所有前序节点的span
61 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
62 }
63
64 /// 比新节点高度更高的节点的span进行+1处理
65 /* increment span for untouched levels */
66 for (i = level; i < zsl->level; i++) {
67 update[i]->level[i].span++;
68 }
69
70 /// 插入的是头节点,则设置前序节点为空
71 x->backward = (update[0] == zsl->header) ? NULL : update[0];
72 /// 如果新节点的前序节点为空则新节点作为位节点
73 if (x->level[0].forward)
74 x->level[0].forward->backward = x;
75 else
76 zsl->tail = x;
77 /// 元素个数 +1
78 zsl->length++;
79 return x;
80}
skiplist的精华在于其span的计算,这样的结构能在准确定位score的基础上快速计算出对应元素的下标。并且因为整个列表有序,所以可以很容易的取出一个范围内的所有元素。
删除节点
知道如何插入新节点再看节点删除就会容易的多,redis删除skiplist节点有两个函数需要我们去关注一下。
1/* Internal function used by zslDelete, zslDeleteRangeByScore and
2 * zslDeleteRangeByRank. */
3void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
4 int i;
5 /// 根据update数组中保存的所有前序节点,更新其后续节点和span
6 for (i = 0; i < zsl->level; i++) {
7 if (update[i]->level[i].forward == x) {
8 update[i]->level[i].span += x->level[i].span - 1;
9 update[i]->level[i].forward = x->level[i].forward;
10 } else {
11 update[i]->level[i].span -= 1;
12 }
13 }
14 /// 对节点是首尾元素的情况进行修正
15 if (x->level[0].forward) {
16 x->level[0].forward->backward = x->backward;
17 } else {
18 zsl->tail = x->backward;
19 }
20
21 /// 根据情况降低跳表高度
22 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
23 zsl->level--;
24 zsl->length--;
25}
26
27/* Delete an element with matching score/element from the skiplist.
28 * The function returns 1 if the node was found and deleted, otherwise
29 * 0 is returned.
30 *
31 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
32 * it is not freed (but just unlinked) and *node is set to the node pointer,
33 * so that it is possible for the caller to reuse the node (including the
34 * referenced SDS string at node->ele). */
35int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
36 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
37 int i;
38
39 x = zsl->header;
40 /// 查找目标元素并保存目标元素前所有高度的前序节点,有点像下楼梯!
41 for (i = zsl->level-1; i >= 0; i--) {
42 while (x->level[i].forward &&
43 (x->level[i].forward->score < score ||
44 (x->level[i].forward->score == score &&
45 sdscmp(x->level[i].forward->ele,ele) < 0)))
46 {
47 x = x->level[i].forward;
48 }
49 update[i] = x;
50 }
51
52 /* We may have multiple elements with the same score, what we need
53 * is to find the element with both the right score and object. */
54 x = x->level[0].forward;
55 if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
56 /// 分值和名字都匹配上了则调用节点删除函数
57 zslDeleteNode(zsl, x, update);
58 /// 如果node为空则释放该节点
59 if (!node)
60 zslFreeNode(x);
61 else
62 *node = x;
63 return 1;
64 }
65 return 0; /* not found */
66}
删除skiplist节点没有什么特别的,除了更新span都是常规的链表操作。
skiplist更新score
当score变动的时候redis又是如何做的呢?如果之前的插入删除已经搞明白了,更新的逻辑应该没什么难度。redis在这里有一个优化点,在仅积分有变化而无需变化位置的时候节点是不做任何操作的。当节点需要改变顺序的时候则先删除找到的节点,再插入一个新节点并释放老节点。
1/* Update the score of an element inside the sorted set skiplist.
2 * Note that the element must exist and must match 'score'.
3 * This function does not update the score in the hash table side, the
4 * caller should take care of it.
5 *
6 * Note that this function attempts to just update the node, in case after
7 * the score update, the node would be exactly at the same position.
8 * Otherwise the skiplist is modified by removing and re-adding a new
9 * element, which is more costly.
10 *
11 * The function returns the updated element skiplist node pointer. */
12zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
13 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
14 int i;
15
16 /* We need to seek to element to update to start: this is useful anyway,
17 * we'll have to update or remove it. */
18 x = zsl->header;
19 /// 查找目标节点
20 for (i = zsl->level-1; i >= 0; i--) {
21 while (x->level[i].forward &&
22 (x->level[i].forward->score < curscore ||
23 (x->level[i].forward->score == curscore &&
24 sdscmp(x->level[i].forward->ele,ele) < 0)))
25 {
26 x = x->level[i].forward;
27 }
28 update[i] = x;
29 }
30
31 /* Jump to our element: note that this function assumes that the
32 * element with the matching score exists. */
33 x = x->level[0].forward;
34 serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
35
36 /* If the node, after the score update, would be still exactly
37 * at the same position, we can just update the score without
38 * actually removing and re-inserting the element in the skiplist. */
39 /// 这里就是上面讲的,是否需要改变位置的判定
40 if ((x->backward == NULL || x->backward->score < newscore) &&
41 (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
42 {
43 x->score = newscore;
44 return x;
45 }
46
47 /* No way to reuse the old node: we need to remove and insert a new
48 * one at a different place. */
49 /// 先删除找到的节点
50 zslDeleteNode(zsl, x, update);
51 /// 插入一个新节点。这里不去复用老节点是因为节点高度无法复用,所以直接插入一个新节点。
52 zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
53 /* We reused the old node x->ele SDS string, free the node now
54 * since zslInsert created a new one. */
55 x->ele = NULL;
56 /// 释放老节点的内存。
57 zslFreeNode(x);
58 return newnode;
59}
当在zset中更新score的时候,redis是有大概率需要重新分配节点内存的。但redis本身使用了内存池,所以内存分配的开销是比较低的。
其他skiplist关于range的操作基本上没跑出以上介绍的增删改查的范围,有兴趣的可以直接看代码。对于已经理解以上代码的人看起来应该没有什么难度。
评论