Redis 源码分析(五)quicklist

压缩的双端队列 QuickList

quicklist 作为list指令的底层数据支持,其实现了一个压缩的队列结构。它并非我们常规意义上的链表,而是将整个链表划分成了两层。

  • 第一层使用常规的链表方式,每个节点包含前序和后序指针。
  • 第二层使用ziplist 所以我们在进行插入删除操作的时候,实际上是先找到第一层的链表节点,再在ziplist中查找目标节点的。

quicklist的数据结构:

QuickList

quicklist的创建函数

 1/* Create a new quicklist.
 2 * Free with quicklistRelease(). */
 3quicklist *quicklistCreate(void) {
 4    struct quicklist *quicklist;
 5
 6    quicklist = zmalloc(sizeof(*quicklist));
 7    quicklist->head = quicklist->tail = NULL;
 8    quicklist->len = 0;
 9    quicklist->count = 0;
10    quicklist->compress = 0;
11    quicklist->fill = -2;
12    quicklist->bookmark_count = 0;
13    return quicklist;
14}

可以看到,其只是分配了一个quicklist结构,并初始化对应参数。

quicklist添加数据

我们再来看quicklist是如何将数据添加到链表的首尾的:

quicklistPushHead

 1/* Add new entry to head node of quicklist.
 2 *
 3 * Returns 0 if used existing head.
 4 * Returns 1 if new head created. */
 5int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
 6    
 7    /// 获取 quicklist 的head对象,由图可知,这是一个quicklistNode类型的指针。
 8    quicklistNode *orig_head = quicklist->head;
 9
10    assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
11
12    /// 此处的likely 是为了告诉编译器,_quicklistNodeAllowInsert返回真的概率更大。编译器会以此为依据进行分支预测,从而加快执行速度。
13
14    /// 此处用于判定ziplist是否允许插入
15    if (likely(_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
16        /// 允许插入则在ziplist前添加一个数据内容为value,长度为sz的数据。
17        quicklist->head->zl =
18            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
19        /// quicklist 更新节点,这里做什么我们等下再说
20        quicklistNodeUpdateSz(quicklist->head);
21    } else {
22        /// _quicklistNodeAllowInsert 返回 false 则新建一个quicklistNode节点
23        quicklistNode *node = quicklistCreateNode();
24
25        /// 将数据插入到新节点的ziplist
26        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
27
28        /// 更新quicklist节点
29        quicklistNodeUpdateSz(node);
30
31        /// 将新生成的节点插入到quicklist首节点之前
32        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
33    }
34
35    /// 增加整个链表的元素个数
36    quicklist->count++;
37
38    /// 增加头节点中保存的元素个数
39    quicklist->head->count++;
40
41    return (orig_head != quicklist->head);
42}

为了完整理解quicklist推入新元素的过程,我们还要看看以下两个函数:

  • _quicklistNodeAllowInsert
  • _quicklistInsertNodeBefore

_quicklistNodeAllowInsert

 1REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node, const int fill, const size_t sz) {
 2    /// 大概率node不为空
 3    if (unlikely(!node))
 4        return 0;
 5
 6    /// 还记得ziplist中节点前序遍历偏移量数据是怎么存储的吗?这里计算其存储尺寸
 7    int ziplist_overhead;
 8    /* size of previous offset */
 9    if (sz < 254)
10        ziplist_overhead = 1;
11    else
12        ziplist_overhead = 5;
13
14    /// 计算后续遍历偏移量的尺寸
15    /* size of forward offset */
16    if (sz < 64)
17        ziplist_overhead += 1;
18    else if (likely(sz < 16384))
19        ziplist_overhead += 2;
20    else
21        ziplist_overhead += 5;
22
23    /// 计算ziplist添加节点后所需的内存大小,如果sz编码为integer则该值可能比实际值大
24    /* new_sz overestimates if 'sz' encodes to an integer type */
25    unsigned int new_sz = node->sz + sz + ziplist_overhead;
26
27    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
28        return 1;
29    /* when we return 1 above we know that the limit is a size limit (which is
30    * safe, see comments next to optimization_level and SIZE_SAFETY_LIMIT) */
31    else if (!sizeMeetsSafetyLimit(new_sz))
32        return 0;
33    else if ((int)node->count < fill)
34        return 1;
35    else
36        return 0;
37}

这里要说明的是fill这个参数:

  • 为正值时该参数指示ziplist最大容纳的元素个数,redis内部给定了上限,32位进程为16383(2^14 - 1)字节,64位进程为65535 (2^16 - 1)字节。
  • 为负值时该参数指示ziplist最大容量,即((-fill)-1)表示optimization_level的下标。redis中optimization_level [] = {4096, 8192, 16384, 32768, 65536};

_quicklistNodeSizeMeetsOptimizationRequirement 函数中,当fill

  • 为正值时直接返回false
  • 为负值时负责检查new_sz是否超过了fill指示的容量。

其次是sizeMeetsSafetyLimit,该宏展开后则是

1else if (!((sz) <= SIZE_SAFETY_LIMIT))

其中SIZE_SAFETY_LIMIT为预定义值,默认取值8192。也就是说单个元素大小超过8192字节的会返回false。这样在quicklistPushHead函数中就会生成一个新节点,并将其插入到新节点中。


_quicklistInsertNodeBefore

 1/* Wrappers for node inserting around existing node. */
 2REDIS_STATIC void _quicklistInsertNodeBefore(quicklist *quicklist,
 3                                            quicklistNode *old_node,
 4                                            quicklistNode *new_node) {
 5    __quicklistInsertNode(quicklist, old_node, new_node, 0);
 6}
 7
 8/* Insert 'new_node' after 'old_node' if 'after' is 1.
 9    * Insert 'new_node' before 'old_node' if 'after' is 0.
10    * Note: 'new_node' is *always* uncompressed, so if we assign it to
11    *       head or tail, we do not need to uncompress it. */
12REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
13                                        quicklistNode *old_node,
14                                        quicklistNode *new_node, int after) {
15    if (after) {
16        new_node->prev = old_node;
17        if (old_node) {
18            new_node->next = old_node->next;
19            if (old_node->next)
20                old_node->next->prev = new_node;
21            old_node->next = new_node;
22        }
23        if (quicklist->tail == old_node)
24            quicklist->tail = new_node;
25    } else {
26        new_node->next = old_node;
27        if (old_node) {
28            new_node->prev = old_node->prev;
29            if (old_node->prev)
30                old_node->prev->next = new_node;
31            old_node->prev = new_node;
32        }
33        if (quicklist->head == old_node)
34            quicklist->head = new_node;
35    }
36    /* If this insert creates the only element so far, initialize head/tail. */
37    if (quicklist->len == 0) {
38        quicklist->head = quicklist->tail = new_node;
39    }
40
41    /* Update len first, so in __quicklistCompress we know exactly len */
42    quicklist->len++;
43
44    if (old_node)
45        quicklistCompress(quicklist, old_node);
46}

函数中其实是链表的基本插入的操作,这里不再详细分析,但是该函数最后两行代码才是整个插入操作的关键所在。当old_node不为空时quicklistCompress干了什么?


quicklistCompress

 1#define quicklistCompress(_ql, _node)           \
 2    do {                                        \
 3        if ((_node)->recompress)                \
 4            quicklistCompressNode((_node));     \
 5        else                                    \
 6            __quicklistCompress((_ql), (_node));\
 7    } while (0)
 8
 9#define quicklistCompressNode(_node)                                           \
10    do {                                                                       \
11        if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_RAW) {     \
12            __quicklistCompressNode((_node));                                  \
13        }                                                                      \
14    } while (0)
15
16/* Force 'quicklist' to meet compression guidelines set by compress depth.
17* The only way to guarantee interior nodes get compressed is to iterate
18* to our "interior" compress depth then compress the next node we find.
19* If compress depth is larger than the entire list, we return immediately. */
20REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
21                                    quicklistNode *node) {
22    /* If length is less than our compress depth (from both sides),
23    * we can't compress anything. */
24    /// quicklist->compress 为 0 表示永不压缩
25    /// quicklist节点数小于compress的两倍,则不压缩
26    if (!quicklistAllowsCompression(quicklist) ||
27        quicklist->len < (unsigned int)(quicklist->compress * 2))
28        return;
29
30    /* Iterate until we reach compress depth for both sides of the list.a
31        * Note: because we do length checks at the *top* of this function,
32        *       we can skip explicit null checks below. Everything exists. */
33    quicklistNode *forward = quicklist->head;
34    quicklistNode *reverse = quicklist->tail;
35    int depth = 0;
36    int in_depth = 0;
37    while (depth++ < quicklist->compress) {
38        quicklistDecompressNode(forward);
39        quicklistDecompressNode(reverse);
40
41        if (forward == node || reverse == node)
42            in_depth = 1;
43
44        /* We passed into compress depth of opposite side of the quicklist
45        * so there's no need to compress anything and we can exit. */
46        if (forward == reverse || forward->next == reverse)
47            return;
48
49        forward = forward->next;
50        reverse = reverse->prev;
51    }
52
53    if (!in_depth)
54        quicklistCompressNode(node);
55
56    /* At this point, forward and reverse are one node beyond depth */
57    quicklistCompressNode(forward);
58    quicklistCompressNode(reverse);
59}

由压缩的限制条件可以看出,当数据增长到一定量时,quicklist会对部分节点进行压缩以节省内存。所以频繁读写压缩节点是会提升redis负载的。但同时由compress指示了quicklistNode链表首尾不参与压缩的节点数,这也是为什么在循环里做decompress的原因。


此外,当一个节点被解压后,recompress会被置1,直到该节点再被压缩。这个标记主要用在节点插入的时候,简化节点插入前解压,插入后重新压缩的判定。


压缩采用LZF压缩算法,其实就是zip的标准算法之一,感兴趣的可以去看看LZF的算法原理。该算法算是在时间复杂度和压缩率之间比较平衡的一种。

quicklistPushTail


看完quicklistPushHead再看quicklistPushTail就容易理解多了

 1/* Add new entry to tail node of quicklist.
 2 *
 3 * Returns 0 if used existing tail.
 4 * Returns 1 if new tail created. */
 5int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
 6    quicklistNode *orig_tail = quicklist->tail;
 7    assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
 8    if (likely(
 9            _quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
10        quicklist->tail->zl =
11            ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
12        quicklistNodeUpdateSz(quicklist->tail);
13    } else {
14        quicklistNode *node = quicklistCreateNode();
15        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
16
17        quicklistNodeUpdateSz(node);
18        _quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
19    }
20    quicklist->count++;
21    quicklist->tail->count++;
22    return (orig_tail != quicklist->tail);
23}

区别只在于quicklistNode取的是tail,以及ziplistPush的最后一个参数改为了ZIPLIST_TAIL。

quicklist 获取列表中的数据

在redis列表中,是如何通过索引来获取对应的值的呢?在redis中是通过quicklistIndex函数完成从index到值的获取的。

 1/* Populate 'entry' with the element at the specified zero-based index
 2 * where 0 is the head, 1 is the element next to head
 3 * and so on. Negative integers are used in order to count
 4 * from the tail, -1 is the last element, -2 the penultimate
 5 * and so on. If the index is out of range 0 is returned.
 6 *
 7 * Returns 1 if element found
 8 * Returns 0 if element not found */
 9int quicklistIndex(const quicklist *quicklist, const long long idx,
10                   quicklistEntry *entry) {
11    quicklistNode *n;
12    unsigned long long accum = 0;
13    unsigned long long index;
14    /// 索引支持负数,-1 表示列表中的最后一个元素
15    int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
16
17    /// 初始化quicklistEntry
18    initEntry(entry);
19    entry->quicklist = quicklist;
20
21    /// 取出遍历的快速节点,并计算元素在链表中的偏移量。
22    if (!forward) {
23        index = (-idx) - 1;
24        n = quicklist->tail;
25    } else {
26        index = idx;
27        n = quicklist->head;
28    }
29
30    /// 超出列表长度的索引直接返回
31    if (index >= quicklist->count)
32        return 0;
33
34    /// 在快速链表中遍历,因为每一个quicklistNode中都包含多个元素,所以这里先查找数据在哪个一级节点中。
35    while (likely(n)) {
36        if ((accum + n->count) > index) {
37            break;
38        } else {
39            D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
40              accum);
41            accum += n->count;
42            n = forward ? n->next : n->prev;
43        }
44    }
45
46    if (!n)
47        return 0;
48
49    D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
50      accum, index, index - accum, (-index) - 1 + accum);
51
52    /// 计算节点在压缩列表中的偏移量
53    entry->node = n;
54    if (forward) {
55        /* forward = normal head-to-tail offset. */
56        entry->offset = index - accum;
57    } else {
58        /* reverse = need negative offset for tail-to-head, so undo
59         * the result of the original if (index < 0) above. */
60        entry->offset = (-index) - 1 + accum;
61    }
62
63    /// 对找到的一级节点进行一次解压,如果不需要解压会立即返回
64    quicklistDecompressNodeForUse(entry->node);
65
66    /// 根据元素索引在压缩列表中(遍历)查找
67    entry->zi = ziplistIndex(entry->node->zl, entry->offset);
68
69    /// 获取找到的数据
70    if (!ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval))
71        assert(0); /* This can happen on corrupt ziplist with fake entry count. */
72    /* The caller will use our result, so we don't re-compress here.
73     * The caller can recompress or delete the node as needed. */
74    return 1;
75}

先介绍这个函数是因为后续介绍的quicklist系列函数中很多都需要用到,为了大家能完整理解redis不出现理解的断层所以先介绍一下。

quicklist数据遍历

我们再来看QuickList是如何对元素进行遍历的:

我们之前说过,quicklist除前后N个节点外,其他节点在未访问的时候都处于压缩状态。所以当我们要访问quicklist数据时需要先对压缩的数据进行解压。在redis中,遍历数据需要先构造一个quicklistIter结构。

1typedef struct quicklistIter {
2    const quicklist *quicklist;
3    quicklistNode *current;
4    unsigned char *zi;
5    long offset; /* offset in current ziplist */
6    int direction;
7} quicklistIter;

这个结构在最前面有介绍,这里不再赘述。我们来看看构造这个结构的两个函数及其代码:

 1/* Returns a quicklist iterator 'iter'. After the initialization every
 2 * call to quicklistNext() will return the next element of the quicklist. */
 3quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) {
 4    quicklistIter *iter;
 5
 6    iter = zmalloc(sizeof(*iter));
 7
 8    if (direction == AL_START_HEAD) {
 9        iter->current = quicklist->head;
10        iter->offset = 0;
11    } else if (direction == AL_START_TAIL) {
12        iter->current = quicklist->tail;
13        iter->offset = -1;
14    }
15
16    iter->direction = direction;
17    iter->quicklist = quicklist;
18
19    iter->zi = NULL;
20
21    return iter;
22}
23
24/* Initialize an iterator at a specific offset 'idx' and make the iterator
25 * return nodes in 'direction' direction. */
26quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
27                                         const int direction,
28                                         const long long idx) {
29    quicklistEntry entry;
30
31    if (quicklistIndex(quicklist, idx, &entry)) {
32        quicklistIter *base = quicklistGetIterator(quicklist, direction);
33        base->zi = NULL;
34        base->current = entry.node;
35        base->offset = entry.offset;
36        return base;
37    } else {
38        return NULL;
39    }
40}

迭代器结构中会记录迭代的链表对象quicklist,以及遍历的方向direction。current指向当前访问的quicklistNode节点,还记得这个节点记录的内容吗?其实每个quicklistNode都指向一个ziplist。在quicklistGetIterator中,zi被初始化为NULL,offset根据迭代方向不同分别被初始化成了0和-1,其实应该是ziplist元素下标的意思。但是在quicklistGetIteratorAtIdx函数中,offset被初始化为正确的ziplist偏移量。

在quicklistGetIteratorAtIdx中,调用了我们之前介绍的quicklistIndex函数,其中会对被访问的quicklistNode进行解压操作。

当quicklistIter生成后,我们就可以通过quicklistNext开始遍历列表

 1/* Get next element in iterator.
 2 *
 3 * Note: You must NOT insert into the list while iterating over it.
 4 * You *may* delete from the list while iterating using the
 5 * quicklistDelEntry() function.
 6 * If you insert into the quicklist while iterating, you should
 7 * re-create the iterator after your addition.
 8 *
 9 * iter = quicklistGetIterator(quicklist,<direction>);
10 * quicklistEntry entry;
11 * while (quicklistNext(iter, &entry)) {
12 *     if (entry.value)
13 *          [[ use entry.value with entry.sz ]]
14 *     else
15 *          [[ use entry.longval ]]
16 * }
17 *
18 * Populates 'entry' with values for this iteration.
19 * Returns 0 when iteration is complete or if iteration not possible.
20 * If return value is 0, the contents of 'entry' are not valid.
21 */
22int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
23    /// 初始化entry对象,entry对象提供了具体的元素信息和数据。
24    initEntry(entry);
25
26    if (!iter) {
27        D("Returning because no iter!");
28        return 0;
29    }
30
31    entry->quicklist = iter->quicklist;
32    entry->node = iter->current;
33
34    /// 已经遍历完成的列表就会进这个判断。
35    if (!iter->current) {
36        D("Returning because current node is NULL")
37        return 0;
38    }
39
40    /// 这里是一个函数指针,用来控制ziplist迭代的方向
41    unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL;
42    int offset_update = 0;
43
44    if (!iter->zi) {
45        /* If !zi, use current index. */
46        /// zi为空代表当前NODE需要尝试进行解压。
47        quicklistDecompressNodeForUse(iter->current);
48        /// 解压后再根据索引获取元素在ziplist中的偏移量。此处offset并非偏移量,而是索引,-1则会取最后一个元素。
49        iter->zi = ziplistIndex(iter->current->zl, iter->offset);
50    } else {
51        /* else, use existing iterator offset and get prev/next as necessary. */
52        if (iter->direction == AL_START_HEAD) {
53            /// 正向遍历,使用ziplistNext函数
54            nextFn = ziplistNext;
55            offset_update = 1;
56        } else if (iter->direction == AL_START_TAIL) {
57            /// 反向遍历,使用ziplistPrev函数
58            nextFn = ziplistPrev;
59            offset_update = -1;
60        }
61        iter->zi = nextFn(iter->current->zl, iter->zi);
62        /// 更新索引
63        iter->offset += offset_update;
64    }
65
66    entry->zi = iter->zi;
67    entry->offset = iter->offset;
68
69    if (iter->zi) {
70        /* Populate value from existing ziplist position */
71        /// 找到了则返回
72        ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
73        return 1;
74    } else {
75        /// 未找到,已经不再当前quicklistNode节点了,则去下一个节点找。
76        /* We ran out of ziplist entries.
77         * Pick next node, update offset, then re-run retrieval. */
78        /// 进入下一个节点之前要先把自己进行压缩
79        quicklistCompress(iter->quicklist, iter->current);
80        if (iter->direction == AL_START_HEAD) {
81            /* Forward traversal */
82            D("Jumping to start of next node");
83            iter->current = iter->current->next;
84            iter->offset = 0;
85        } else if (iter->direction == AL_START_TAIL) {
86            /* Reverse traversal */
87            D("Jumping to end of previous node");
88            iter->current = iter->current->prev;
89            iter->offset = -1;
90        }
91        iter->zi = NULL;
92        return quicklistNext(iter, entry);
93    }
94}

我将注释都写在代码里面了,其实redis的注释是非常清晰的,我更多的工作只是做了翻译。

另外就是在遍历quicklist的时候,是不能插入新数据的。遍历时插入新数据会产生不可预料的结果,主要有这几种情况

  • 在当前quicklistNode插入数据,可能触发压缩。
  • 插入到ziplist中当前元素前方会导致偏移量失效。 但是删除元素函数前的注释中说是可以的,但我认为应该是只能删除迭代的当前元素。否则也可能产生ziplist偏移量失效的情况。 如果在遍历时插入或删除了数据,则一定记得要重新生成迭代对象。
1/* Release iterator.
2 * If we still have a valid current node, then re-encode current node. */
3void quicklistReleaseIterator(quicklistIter *iter) {
4    if (iter->current)
5        quicklistCompress(iter->quicklist, iter->current);
6
7    zfree(iter);
8}

析构迭代对象时也会对当前节点进行一次压缩尝试。

quicklist对应redis的list相关指令,在后续我们分析命令解析的时候再看redis是如何将用户输入的指令转为具体的quicklist调用的。

评论