压缩的双端队列 QuickList
quicklist 作为list指令的底层数据支持,其实现了一个压缩的队列结构。它并非我们常规意义上的链表,而是将整个链表划分成了两层。
- 第一层使用常规的链表方式,每个节点包含前序和后序指针。
- 第二层使用ziplist 所以我们在进行插入删除操作的时候,实际上是先找到第一层的链表节点,再在ziplist中查找目标节点的。
quicklist的数据结构:
quicklist的创建函数
1/* Create a new quicklist.
2 * Free with quicklistRelease(). */
3quicklist *quicklistCreate(void) {
4 struct quicklist *quicklist;
5
6 quicklist = zmalloc(sizeof(*quicklist));
7 quicklist->head = quicklist->tail = NULL;
8 quicklist->len = 0;
9 quicklist->count = 0;
10 quicklist->compress = 0;
11 quicklist->fill = -2;
12 quicklist->bookmark_count = 0;
13 return quicklist;
14}
可以看到,其只是分配了一个quicklist结构,并初始化对应参数。
quicklist添加数据
我们再来看quicklist是如何将数据添加到链表的首尾的:
quicklistPushHead
1/* Add new entry to head node of quicklist.
2 *
3 * Returns 0 if used existing head.
4 * Returns 1 if new head created. */
5int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
6
7 /// 获取 quicklist 的head对象,由图可知,这是一个quicklistNode类型的指针。
8 quicklistNode *orig_head = quicklist->head;
9
10 assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
11
12 /// 此处的likely 是为了告诉编译器,_quicklistNodeAllowInsert返回真的概率更大。编译器会以此为依据进行分支预测,从而加快执行速度。
13
14 /// 此处用于判定ziplist是否允许插入
15 if (likely(_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
16 /// 允许插入则在ziplist前添加一个数据内容为value,长度为sz的数据。
17 quicklist->head->zl =
18 ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
19 /// quicklist 更新节点,这里做什么我们等下再说
20 quicklistNodeUpdateSz(quicklist->head);
21 } else {
22 /// _quicklistNodeAllowInsert 返回 false 则新建一个quicklistNode节点
23 quicklistNode *node = quicklistCreateNode();
24
25 /// 将数据插入到新节点的ziplist
26 node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
27
28 /// 更新quicklist节点
29 quicklistNodeUpdateSz(node);
30
31 /// 将新生成的节点插入到quicklist首节点之前
32 _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
33 }
34
35 /// 增加整个链表的元素个数
36 quicklist->count++;
37
38 /// 增加头节点中保存的元素个数
39 quicklist->head->count++;
40
41 return (orig_head != quicklist->head);
42}
为了完整理解quicklist推入新元素的过程,我们还要看看以下两个函数:
- _quicklistNodeAllowInsert
- _quicklistInsertNodeBefore
_quicklistNodeAllowInsert
1REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node, const int fill, const size_t sz) {
2 /// 大概率node不为空
3 if (unlikely(!node))
4 return 0;
5
6 /// 还记得ziplist中节点前序遍历偏移量数据是怎么存储的吗?这里计算其存储尺寸
7 int ziplist_overhead;
8 /* size of previous offset */
9 if (sz < 254)
10 ziplist_overhead = 1;
11 else
12 ziplist_overhead = 5;
13
14 /// 计算后续遍历偏移量的尺寸
15 /* size of forward offset */
16 if (sz < 64)
17 ziplist_overhead += 1;
18 else if (likely(sz < 16384))
19 ziplist_overhead += 2;
20 else
21 ziplist_overhead += 5;
22
23 /// 计算ziplist添加节点后所需的内存大小,如果sz编码为integer则该值可能比实际值大
24 /* new_sz overestimates if 'sz' encodes to an integer type */
25 unsigned int new_sz = node->sz + sz + ziplist_overhead;
26
27 if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
28 return 1;
29 /* when we return 1 above we know that the limit is a size limit (which is
30 * safe, see comments next to optimization_level and SIZE_SAFETY_LIMIT) */
31 else if (!sizeMeetsSafetyLimit(new_sz))
32 return 0;
33 else if ((int)node->count < fill)
34 return 1;
35 else
36 return 0;
37}
这里要说明的是fill这个参数:
- 为正值时该参数指示ziplist最大容纳的元素个数,redis内部给定了上限,32位进程为16383(2^14 - 1)字节,64位进程为65535 (2^16 - 1)字节。
- 为负值时该参数指示ziplist最大容量,即((-fill)-1)表示optimization_level的下标。redis中optimization_level [] = {4096, 8192, 16384, 32768, 65536};
_quicklistNodeSizeMeetsOptimizationRequirement 函数中,当fill
- 为正值时直接返回false
- 为负值时负责检查new_sz是否超过了fill指示的容量。
其次是sizeMeetsSafetyLimit,该宏展开后则是
1else if (!((sz) <= SIZE_SAFETY_LIMIT))
其中SIZE_SAFETY_LIMIT为预定义值,默认取值8192。也就是说单个元素大小超过8192字节的会返回false。这样在quicklistPushHead函数中就会生成一个新节点,并将其插入到新节点中。
_quicklistInsertNodeBefore
1/* Wrappers for node inserting around existing node. */
2REDIS_STATIC void _quicklistInsertNodeBefore(quicklist *quicklist,
3 quicklistNode *old_node,
4 quicklistNode *new_node) {
5 __quicklistInsertNode(quicklist, old_node, new_node, 0);
6}
7
8/* Insert 'new_node' after 'old_node' if 'after' is 1.
9 * Insert 'new_node' before 'old_node' if 'after' is 0.
10 * Note: 'new_node' is *always* uncompressed, so if we assign it to
11 * head or tail, we do not need to uncompress it. */
12REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
13 quicklistNode *old_node,
14 quicklistNode *new_node, int after) {
15 if (after) {
16 new_node->prev = old_node;
17 if (old_node) {
18 new_node->next = old_node->next;
19 if (old_node->next)
20 old_node->next->prev = new_node;
21 old_node->next = new_node;
22 }
23 if (quicklist->tail == old_node)
24 quicklist->tail = new_node;
25 } else {
26 new_node->next = old_node;
27 if (old_node) {
28 new_node->prev = old_node->prev;
29 if (old_node->prev)
30 old_node->prev->next = new_node;
31 old_node->prev = new_node;
32 }
33 if (quicklist->head == old_node)
34 quicklist->head = new_node;
35 }
36 /* If this insert creates the only element so far, initialize head/tail. */
37 if (quicklist->len == 0) {
38 quicklist->head = quicklist->tail = new_node;
39 }
40
41 /* Update len first, so in __quicklistCompress we know exactly len */
42 quicklist->len++;
43
44 if (old_node)
45 quicklistCompress(quicklist, old_node);
46}
函数中其实是链表的基本插入的操作,这里不再详细分析,但是该函数最后两行代码才是整个插入操作的关键所在。当old_node不为空时quicklistCompress干了什么?
quicklistCompress
1#define quicklistCompress(_ql, _node) \
2 do { \
3 if ((_node)->recompress) \
4 quicklistCompressNode((_node)); \
5 else \
6 __quicklistCompress((_ql), (_node));\
7 } while (0)
8
9#define quicklistCompressNode(_node) \
10 do { \
11 if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_RAW) { \
12 __quicklistCompressNode((_node)); \
13 } \
14 } while (0)
15
16/* Force 'quicklist' to meet compression guidelines set by compress depth.
17* The only way to guarantee interior nodes get compressed is to iterate
18* to our "interior" compress depth then compress the next node we find.
19* If compress depth is larger than the entire list, we return immediately. */
20REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
21 quicklistNode *node) {
22 /* If length is less than our compress depth (from both sides),
23 * we can't compress anything. */
24 /// quicklist->compress 为 0 表示永不压缩
25 /// quicklist节点数小于compress的两倍,则不压缩
26 if (!quicklistAllowsCompression(quicklist) ||
27 quicklist->len < (unsigned int)(quicklist->compress * 2))
28 return;
29
30 /* Iterate until we reach compress depth for both sides of the list.a
31 * Note: because we do length checks at the *top* of this function,
32 * we can skip explicit null checks below. Everything exists. */
33 quicklistNode *forward = quicklist->head;
34 quicklistNode *reverse = quicklist->tail;
35 int depth = 0;
36 int in_depth = 0;
37 while (depth++ < quicklist->compress) {
38 quicklistDecompressNode(forward);
39 quicklistDecompressNode(reverse);
40
41 if (forward == node || reverse == node)
42 in_depth = 1;
43
44 /* We passed into compress depth of opposite side of the quicklist
45 * so there's no need to compress anything and we can exit. */
46 if (forward == reverse || forward->next == reverse)
47 return;
48
49 forward = forward->next;
50 reverse = reverse->prev;
51 }
52
53 if (!in_depth)
54 quicklistCompressNode(node);
55
56 /* At this point, forward and reverse are one node beyond depth */
57 quicklistCompressNode(forward);
58 quicklistCompressNode(reverse);
59}
由压缩的限制条件可以看出,当数据增长到一定量时,quicklist会对部分节点进行压缩以节省内存。所以频繁读写压缩节点是会提升redis负载的。但同时由compress指示了quicklistNode链表首尾不参与压缩的节点数,这也是为什么在循环里做decompress的原因。
此外,当一个节点被解压后,recompress会被置1,直到该节点再被压缩。这个标记主要用在节点插入的时候,简化节点插入前解压,插入后重新压缩的判定。
压缩采用LZF压缩算法,其实就是zip的标准算法之一,感兴趣的可以去看看LZF的算法原理。该算法算是在时间复杂度和压缩率之间比较平衡的一种。
quicklistPushTail
看完quicklistPushHead再看quicklistPushTail就容易理解多了
1/* Add new entry to tail node of quicklist.
2 *
3 * Returns 0 if used existing tail.
4 * Returns 1 if new tail created. */
5int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
6 quicklistNode *orig_tail = quicklist->tail;
7 assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
8 if (likely(
9 _quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
10 quicklist->tail->zl =
11 ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
12 quicklistNodeUpdateSz(quicklist->tail);
13 } else {
14 quicklistNode *node = quicklistCreateNode();
15 node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
16
17 quicklistNodeUpdateSz(node);
18 _quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
19 }
20 quicklist->count++;
21 quicklist->tail->count++;
22 return (orig_tail != quicklist->tail);
23}
区别只在于quicklistNode取的是tail,以及ziplistPush的最后一个参数改为了ZIPLIST_TAIL。
quicklist 获取列表中的数据
在redis列表中,是如何通过索引来获取对应的值的呢?在redis中是通过quicklistIndex函数完成从index到值的获取的。
1/* Populate 'entry' with the element at the specified zero-based index
2 * where 0 is the head, 1 is the element next to head
3 * and so on. Negative integers are used in order to count
4 * from the tail, -1 is the last element, -2 the penultimate
5 * and so on. If the index is out of range 0 is returned.
6 *
7 * Returns 1 if element found
8 * Returns 0 if element not found */
9int quicklistIndex(const quicklist *quicklist, const long long idx,
10 quicklistEntry *entry) {
11 quicklistNode *n;
12 unsigned long long accum = 0;
13 unsigned long long index;
14 /// 索引支持负数,-1 表示列表中的最后一个元素
15 int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
16
17 /// 初始化quicklistEntry
18 initEntry(entry);
19 entry->quicklist = quicklist;
20
21 /// 取出遍历的快速节点,并计算元素在链表中的偏移量。
22 if (!forward) {
23 index = (-idx) - 1;
24 n = quicklist->tail;
25 } else {
26 index = idx;
27 n = quicklist->head;
28 }
29
30 /// 超出列表长度的索引直接返回
31 if (index >= quicklist->count)
32 return 0;
33
34 /// 在快速链表中遍历,因为每一个quicklistNode中都包含多个元素,所以这里先查找数据在哪个一级节点中。
35 while (likely(n)) {
36 if ((accum + n->count) > index) {
37 break;
38 } else {
39 D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
40 accum);
41 accum += n->count;
42 n = forward ? n->next : n->prev;
43 }
44 }
45
46 if (!n)
47 return 0;
48
49 D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
50 accum, index, index - accum, (-index) - 1 + accum);
51
52 /// 计算节点在压缩列表中的偏移量
53 entry->node = n;
54 if (forward) {
55 /* forward = normal head-to-tail offset. */
56 entry->offset = index - accum;
57 } else {
58 /* reverse = need negative offset for tail-to-head, so undo
59 * the result of the original if (index < 0) above. */
60 entry->offset = (-index) - 1 + accum;
61 }
62
63 /// 对找到的一级节点进行一次解压,如果不需要解压会立即返回
64 quicklistDecompressNodeForUse(entry->node);
65
66 /// 根据元素索引在压缩列表中(遍历)查找
67 entry->zi = ziplistIndex(entry->node->zl, entry->offset);
68
69 /// 获取找到的数据
70 if (!ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval))
71 assert(0); /* This can happen on corrupt ziplist with fake entry count. */
72 /* The caller will use our result, so we don't re-compress here.
73 * The caller can recompress or delete the node as needed. */
74 return 1;
75}
先介绍这个函数是因为后续介绍的quicklist系列函数中很多都需要用到,为了大家能完整理解redis不出现理解的断层所以先介绍一下。
quicklist数据遍历
我们再来看QuickList是如何对元素进行遍历的:
我们之前说过,quicklist除前后N个节点外,其他节点在未访问的时候都处于压缩状态。所以当我们要访问quicklist数据时需要先对压缩的数据进行解压。在redis中,遍历数据需要先构造一个quicklistIter结构。
1typedef struct quicklistIter {
2 const quicklist *quicklist;
3 quicklistNode *current;
4 unsigned char *zi;
5 long offset; /* offset in current ziplist */
6 int direction;
7} quicklistIter;
这个结构在最前面有介绍,这里不再赘述。我们来看看构造这个结构的两个函数及其代码:
1/* Returns a quicklist iterator 'iter'. After the initialization every
2 * call to quicklistNext() will return the next element of the quicklist. */
3quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) {
4 quicklistIter *iter;
5
6 iter = zmalloc(sizeof(*iter));
7
8 if (direction == AL_START_HEAD) {
9 iter->current = quicklist->head;
10 iter->offset = 0;
11 } else if (direction == AL_START_TAIL) {
12 iter->current = quicklist->tail;
13 iter->offset = -1;
14 }
15
16 iter->direction = direction;
17 iter->quicklist = quicklist;
18
19 iter->zi = NULL;
20
21 return iter;
22}
23
24/* Initialize an iterator at a specific offset 'idx' and make the iterator
25 * return nodes in 'direction' direction. */
26quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
27 const int direction,
28 const long long idx) {
29 quicklistEntry entry;
30
31 if (quicklistIndex(quicklist, idx, &entry)) {
32 quicklistIter *base = quicklistGetIterator(quicklist, direction);
33 base->zi = NULL;
34 base->current = entry.node;
35 base->offset = entry.offset;
36 return base;
37 } else {
38 return NULL;
39 }
40}
迭代器结构中会记录迭代的链表对象quicklist,以及遍历的方向direction。current指向当前访问的quicklistNode节点,还记得这个节点记录的内容吗?其实每个quicklistNode都指向一个ziplist。在quicklistGetIterator中,zi被初始化为NULL,offset根据迭代方向不同分别被初始化成了0和-1,其实应该是ziplist元素下标的意思。但是在quicklistGetIteratorAtIdx函数中,offset被初始化为正确的ziplist偏移量。
在quicklistGetIteratorAtIdx中,调用了我们之前介绍的quicklistIndex函数,其中会对被访问的quicklistNode进行解压操作。
当quicklistIter生成后,我们就可以通过quicklistNext开始遍历列表
1/* Get next element in iterator.
2 *
3 * Note: You must NOT insert into the list while iterating over it.
4 * You *may* delete from the list while iterating using the
5 * quicklistDelEntry() function.
6 * If you insert into the quicklist while iterating, you should
7 * re-create the iterator after your addition.
8 *
9 * iter = quicklistGetIterator(quicklist,<direction>);
10 * quicklistEntry entry;
11 * while (quicklistNext(iter, &entry)) {
12 * if (entry.value)
13 * [[ use entry.value with entry.sz ]]
14 * else
15 * [[ use entry.longval ]]
16 * }
17 *
18 * Populates 'entry' with values for this iteration.
19 * Returns 0 when iteration is complete or if iteration not possible.
20 * If return value is 0, the contents of 'entry' are not valid.
21 */
22int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
23 /// 初始化entry对象,entry对象提供了具体的元素信息和数据。
24 initEntry(entry);
25
26 if (!iter) {
27 D("Returning because no iter!");
28 return 0;
29 }
30
31 entry->quicklist = iter->quicklist;
32 entry->node = iter->current;
33
34 /// 已经遍历完成的列表就会进这个判断。
35 if (!iter->current) {
36 D("Returning because current node is NULL")
37 return 0;
38 }
39
40 /// 这里是一个函数指针,用来控制ziplist迭代的方向
41 unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL;
42 int offset_update = 0;
43
44 if (!iter->zi) {
45 /* If !zi, use current index. */
46 /// zi为空代表当前NODE需要尝试进行解压。
47 quicklistDecompressNodeForUse(iter->current);
48 /// 解压后再根据索引获取元素在ziplist中的偏移量。此处offset并非偏移量,而是索引,-1则会取最后一个元素。
49 iter->zi = ziplistIndex(iter->current->zl, iter->offset);
50 } else {
51 /* else, use existing iterator offset and get prev/next as necessary. */
52 if (iter->direction == AL_START_HEAD) {
53 /// 正向遍历,使用ziplistNext函数
54 nextFn = ziplistNext;
55 offset_update = 1;
56 } else if (iter->direction == AL_START_TAIL) {
57 /// 反向遍历,使用ziplistPrev函数
58 nextFn = ziplistPrev;
59 offset_update = -1;
60 }
61 iter->zi = nextFn(iter->current->zl, iter->zi);
62 /// 更新索引
63 iter->offset += offset_update;
64 }
65
66 entry->zi = iter->zi;
67 entry->offset = iter->offset;
68
69 if (iter->zi) {
70 /* Populate value from existing ziplist position */
71 /// 找到了则返回
72 ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
73 return 1;
74 } else {
75 /// 未找到,已经不再当前quicklistNode节点了,则去下一个节点找。
76 /* We ran out of ziplist entries.
77 * Pick next node, update offset, then re-run retrieval. */
78 /// 进入下一个节点之前要先把自己进行压缩
79 quicklistCompress(iter->quicklist, iter->current);
80 if (iter->direction == AL_START_HEAD) {
81 /* Forward traversal */
82 D("Jumping to start of next node");
83 iter->current = iter->current->next;
84 iter->offset = 0;
85 } else if (iter->direction == AL_START_TAIL) {
86 /* Reverse traversal */
87 D("Jumping to end of previous node");
88 iter->current = iter->current->prev;
89 iter->offset = -1;
90 }
91 iter->zi = NULL;
92 return quicklistNext(iter, entry);
93 }
94}
我将注释都写在代码里面了,其实redis的注释是非常清晰的,我更多的工作只是做了翻译。
另外就是在遍历quicklist的时候,是不能插入新数据的。遍历时插入新数据会产生不可预料的结果,主要有这几种情况
- 在当前quicklistNode插入数据,可能触发压缩。
- 插入到ziplist中当前元素前方会导致偏移量失效。 但是删除元素函数前的注释中说是可以的,但我认为应该是只能删除迭代的当前元素。否则也可能产生ziplist偏移量失效的情况。 如果在遍历时插入或删除了数据,则一定记得要重新生成迭代对象。
1/* Release iterator.
2 * If we still have a valid current node, then re-encode current node. */
3void quicklistReleaseIterator(quicklistIter *iter) {
4 if (iter->current)
5 quicklistCompress(iter->quicklist, iter->current);
6
7 zfree(iter);
8}
析构迭代对象时也会对当前节点进行一次压缩尝试。
quicklist对应redis的list相关指令,在后续我们分析命令解析的时候再看redis是如何将用户输入的指令转为具体的quicklist调用的。
评论