处理网络数据

当网络连接准备好之后redis就会开始等待接收网络数据并处理。

还记得我们在分析createClient这个函数的时候,当conn不为空时的处理吗?

1    if (conn) {
2        connNonBlock(conn);
3        connEnableTcpNoDelay(conn);
4        if (server.tcpkeepalive)
5            connKeepAlive(conn,server.tcpkeepalive);
6        connSetReadHandler(conn, readQueryFromClient);
7        connSetPrivateData(conn, c);
8    }

这里client通过调用connSetReadHandler为数据处理设置了一个回调函数readQueryFromClient,这个函数就是客户端处理网络数据的入口。

readQueryFromClient

 1void readQueryFromClient(connection *conn) {
 2    client *c = connGetPrivateData(conn);
 3    int nread, readlen;
 4    size_t qblen;
 5
 6    /* Check if we want to read from the client later when exiting from
 7     * the event loop. This is the case if threaded I/O is enabled. */
 8    if (postponeClientRead(c)) return;
 9
10    /* Update total number of reads on server */
11    atomicIncr(server.stat_total_reads_processed, 1);
12
13    readlen = PROTO_IOBUF_LEN;
14    /* If this is a multi bulk request, and we are processing a bulk reply
15     * that is large enough, try to maximize the probability that the query
16     * buffer contains exactly the SDS string representing the object, even
17     * at the risk of requiring more read(2) calls. This way the function
18     * processMultiBulkBuffer() can avoid copying buffers to create the
19     * Redis Object representing the argument. */
20    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
21        && c->bulklen >= PROTO_MBULK_BIG_ARG)
22    {
23        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
24
25        /* Note that the 'remaining' variable may be zero in some edge case,
26         * for example once we resume a blocked client after CLIENT PAUSE. */
27        if (remaining > 0 && remaining < readlen) readlen = remaining;
28    }
29
30    qblen = sdslen(c->querybuf);
31    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
32    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
33    nread = connRead(c->conn, c->querybuf+qblen, readlen);
34    if (nread == -1) {
35        if (connGetState(conn) == CONN_STATE_CONNECTED) {
36            return;
37        } else {
38            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
39            freeClientAsync(c);
40            return;
41        }
42    } else if (nread == 0) {
43        serverLog(LL_VERBOSE, "Client closed connection");
44        freeClientAsync(c);
45        return;
46    } else if (c->flags & CLIENT_MASTER) {
47        /* Append the query buffer to the pending (not applied) buffer
48         * of the master. We'll use this buffer later in order to have a
49         * copy of the string applied by the last command executed. */
50        c->pending_querybuf = sdscatlen(c->pending_querybuf,
51                                        c->querybuf+qblen,nread);
52    }
53
54    sdsIncrLen(c->querybuf,nread);
55    c->lastinteraction = server.unixtime;
56    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
57    atomicIncr(server.stat_net_input_bytes, nread);
58    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
59        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
60
61        bytes = sdscatrepr(bytes,c->querybuf,64);
62        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
63        sdsfree(ci);
64        sdsfree(bytes);
65        freeClientAsync(c);
66        return;
67    }
68
69    /* There is more data in the client input buffer, continue parsing it
70     * in case to check if there is a full command to execute. */
71     processInputBuffer(c);
72}

这个函数一开头就碰到这样一条语句

1if (postponeClientRead(c)) return;

这个函数会判断客户端是否开启并满足异步读的条件,满足返回1否则返回0

postponeClinetRead

 1int postponeClientRead(client *c) {
 2    if (server.io_threads_active &&
 3        server.io_threads_do_reads &&
 4        !ProcessingEventsWhileBlocked &&
 5        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
 6    {
 7        c->flags |= CLIENT_PENDING_READ;
 8        listAddNodeHead(server.clients_pending_read,c);
 9        return 1;
10    } else {
11        return 0;
12    }
13}

这个判定的含义如下:

  • io_threads_active 记录被激活的IO线程数量
  • io-threads-do-reads 这是一个配置项,指示是否使用io线程来读取和解析redis命令
  • ProcessingEventsWhileBlocked 这个标记用来判断是否在一个长阻塞的情况下
  • c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED) 要求客户端不存在这些标记
    • CLIENT_MASTER: 主从模式下由Master创建的客户端连接
    • CLIENT_SLAVE: 主从模式下由Slave创建的客户端连接
    • CLIENT_PENDING_READ: 客户端已经被标记为异步读的情况下
    • CLIENT_BLOCKED: 客户端是否处于阻塞模式,一旦处于阻塞模式,客户端将不再接受该客户端的命令。由多种情况会让客户端处于阻塞模式:
      • 由Redis模块发起的阻塞
      • 由wait指令发起的阻塞
      • 由服务器暂停引发的阻塞 判定成功后会将client加入到server.clients_pending_read链表中等待IO线程处理。

在分析client读取数据前我们先来看以下几个和读缓冲相关的client字段:

  • client
    • reqtype 标记请求的类型
      • PROTO_REQ_INLINE 单行指令
      • PROTO_REQ_MULTIBULK 多行指令
    • querybuf 累积的指令缓冲
    • qb_pos 已读取过的缓冲位置

读取数据的过程如下:

  1. redis 先获取已读入的缓冲长度并保存在qblen中
  2. 将querybuf的可用空间扩展到readlen所给的大小
  3. 使用connRead将数据读入querybuf中
  4. 读成功后更新缓冲区长度
  5. 更新redis统计数据
  6. 将已读缓冲送入processInputBuffer中处理。

processInputBuffer

这个函数解析redis协议

 1void processInputBuffer(client *c) {
 2    /* Keep processing while there is something in the input buffer */
 3    while(c->qb_pos < sdslen(c->querybuf)) {
 4        /* Immediately abort if the client is in the middle of something. */
 5        if (c->flags & CLIENT_BLOCKED) break;
 6
 7        /* Don't process more buffers from clients that have already pending
 8         * commands to execute in c->argv. */
 9        if (c->flags & CLIENT_PENDING_COMMAND) break;
10
11        /* Don't process input from the master while there is a busy script
12         * condition on the slave. We want just to accumulate the replication
13         * stream (instead of replying -BUSY like we do with other clients) and
14         * later resume the processing. */
15        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
16
17        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
18         * written to the client. Make sure to not let the reply grow after
19         * this flag has been set (i.e. don't process more commands).
20         *
21         * The same applies for clients we want to terminate ASAP. */
22        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
23
24        /* Determine request type when unknown. */
25        if (!c->reqtype) {
26            if (c->querybuf[c->qb_pos] == '*') {
27                c->reqtype = PROTO_REQ_MULTIBULK;
28            } else {
29                c->reqtype = PROTO_REQ_INLINE;
30            }
31        }
32
33        if (c->reqtype == PROTO_REQ_INLINE) {
34            if (processInlineBuffer(c) != C_OK) break;
35            /* If the Gopher mode and we got zero or one argument, process
36             * the request in Gopher mode. To avoid data race, Redis won't
37             * support Gopher if enable io threads to read queries. */
38            if (server.gopher_enabled && !server.io_threads_do_reads &&
39                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
40                  c->argc == 0))
41            {
42                processGopherRequest(c);
43                resetClient(c);
44                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
45                break;
46            }
47        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
48            if (processMultibulkBuffer(c) != C_OK) break;
49        } else {
50            serverPanic("Unknown request type");
51        }
52
53        /* Multibulk processing could see a <= 0 length. */
54        if (c->argc == 0) {
55            resetClient(c);
56        } else {
57            /* If we are in the context of an I/O thread, we can't really
58             * execute the command here. All we can do is to flag the client
59             * as one that needs to process the command. */
60            if (c->flags & CLIENT_PENDING_READ) {
61                c->flags |= CLIENT_PENDING_COMMAND;
62                break;
63            }
64
65            /* We are finally ready to execute the command. */
66            if (processCommandAndResetClient(c) == C_ERR) {
67                /* If the client is no longer valid, we avoid exiting this
68                 * loop and trimming the client buffer later. So we return
69                 * ASAP in that case. */
70                return;
71            }
72        }
73    }
74
75    /* Trim to pos */
76    if (c->qb_pos) {
77        sdsrange(c->querybuf,c->qb_pos,-1);
78        c->qb_pos = 0;
79    }
80}

由这里我们可以看到Redis协议如何区分两种请求类型

1/* Determine request type when unknown. */
2if (!c->reqtype) {
3    if (c->querybuf[c->qb_pos] == '*') {
4        c->reqtype = PROTO_REQ_MULTIBULK;
5    } else {
6        c->reqtype = PROTO_REQ_INLINE;
7    }
8}

第一个字符为*则是多行协议,否则是单行协议。单行协议由processInlineBuffer处理,多行协议由processMultibulkBuffer处理。

processInlineBuffer

单行协议的处理要简单一些,我们先来看下代码。

 1int processInlineBuffer(client *c) {
 2    char *newline;
 3    int argc, j, linefeed_chars = 1;
 4    sds *argv, aux;
 5    size_t querylen;
 6
 7    /* Search for end of line */
 8    newline = strchr(c->querybuf+c->qb_pos,'\n');
 9
10    /* Nothing to do without a \r\n */
11    if (newline == NULL) {
12        if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
13            addReplyError(c,"Protocol error: too big inline request");
14            setProtocolError("too big inline request",c);
15        }
16        return C_ERR;
17    }
18
19    /* Handle the \r\n case. */
20    if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
21        newline--, linefeed_chars++;
22
23    /* Split the input buffer up to the \r\n */
24    querylen = newline-(c->querybuf+c->qb_pos);
25    aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
26    argv = sdssplitargs(aux,&argc);
27    sdsfree(aux);
28    if (argv == NULL) {
29        addReplyError(c,"Protocol error: unbalanced quotes in request");
30        setProtocolError("unbalanced quotes in inline request",c);
31        return C_ERR;
32    }
33
34    /* Newline from slaves can be used to refresh the last ACK time.
35     * This is useful for a slave to ping back while loading a big
36     * RDB file. */
37    if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
38        c->repl_ack_time = server.unixtime;
39
40    /* Masters should never send us inline protocol to run actual
41     * commands. If this happens, it is likely due to a bug in Redis where
42     * we got some desynchronization in the protocol, for example
43     * beause of a PSYNC gone bad.
44     *
45     * However the is an exception: masters may send us just a newline
46     * to keep the connection active. */
47    if (querylen != 0 && c->flags & CLIENT_MASTER) {
48        sdsfreesplitres(argv,argc);
49        serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
50        setProtocolError("Master using the inline protocol. Desync?",c);
51        return C_ERR;
52    }
53
54    /* Move querybuffer position to the next query in the buffer. */
55    c->qb_pos += querylen+linefeed_chars;
56
57    /* Setup argv array on client structure */
58    if (argc) {
59        if (c->argv) zfree(c->argv);
60        c->argv = zmalloc(sizeof(robj*)*argc);
61        c->argv_len_sum = 0;
62    }
63
64    /* Create redis objects for all arguments. */
65    for (c->argc = 0, j = 0; j < argc; j++) {
66        c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
67        c->argc++;
68        c->argv_len_sum += sdslen(argv[j]);
69    }
70    zfree(argv);
71    return C_OK;
72}

大体的逻辑如下:

  1. 使用strchr查找'\n’字符

    1newline = strchr(c->querybuf+c->qb_pos,'\n');
    
  2. 修正新行位置,redis使用\r\n来作为行结束标志

    1if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
    2    newline--, linefeed_chars++;
    
  3. 将行使用sdssplitargs分割为参数列表并保存在argv数组中

    1querylen = newline-(c->querybuf+c->qb_pos);
    2aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
    3argv = sdssplitargs(aux,&argc);
    
  4. 调整已读指针

    1c->qb_pos += querylen+linefeed_chars;
    
  5. 将参数数组argv复制到client::argv中,这里之所以没有直接用argv中的数据是因为client::argv的类型并非sds而是robj

     1    /* Setup argv array on client structure */
     2if (argc) {
     3    if (c->argv) zfree(c->argv);
     4    c->argv = zmalloc(sizeof(robj*)*argc);
     5    c->argv_len_sum = 0;
     6}
     7
     8/* Create redis objects for all arguments. */
     9for (c->argc = 0, j = 0; j < argc; j++) {
    10    c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
    11    c->argc++;
    12    c->argv_len_sum += sdslen(argv[j]);
    13}
    

processMultibulkBuffer

这个函数比较长,我大概捋了一下整个流程。

  1int processMultibulkBuffer(client *c) {
  2    char *newline = NULL;
  3    int ok;
  4    long long ll;
  5
  6    if (c->multibulklen == 0) {
  7        /* The client should have been reset */
  8        serverAssertWithInfo(c,NULL,c->argc == 0);
  9
 10        /* Multi bulk length cannot be read without a \r\n */
 11        newline = strchr(c->querybuf+c->qb_pos,'\r');
 12        if (newline == NULL) {
 13            if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 14                addReplyError(c,"Protocol error: too big mbulk count string");
 15                setProtocolError("too big mbulk count string",c);
 16            }
 17            return C_ERR;
 18        }
 19
 20        /* Buffer should also contain \n */
 21        if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
 22            return C_ERR;
 23
 24        /* We know for sure there is a whole line since newline != NULL,
 25         * so go ahead and find out the multi bulk length. */
 26        serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
 27        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
 28        if (!ok || ll > 1024*1024) {
 29            addReplyError(c,"Protocol error: invalid multibulk length");
 30            setProtocolError("invalid mbulk count",c);
 31            return C_ERR;
 32        } else if (ll > 10 && authRequired(c)) {
 33            addReplyError(c, "Protocol error: unauthenticated multibulk length");
 34            setProtocolError("unauth mbulk count", c);
 35            return C_ERR;
 36        }
 37
 38        c->qb_pos = (newline-c->querybuf)+2;
 39
 40        if (ll <= 0) return C_OK;
 41
 42        c->multibulklen = ll;
 43
 44        /* Setup argv array on client structure */
 45        if (c->argv) zfree(c->argv);
 46        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
 47        c->argv_len_sum = 0;
 48    }
 49
 50    serverAssertWithInfo(c,NULL,c->multibulklen > 0);
 51    while(c->multibulklen) {
 52        /* Read bulk length if unknown */
 53        if (c->bulklen == -1) {
 54            newline = strchr(c->querybuf+c->qb_pos,'\r');
 55            if (newline == NULL) {
 56                if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 57                    addReplyError(c,
 58                        "Protocol error: too big bulk count string");
 59                    setProtocolError("too big bulk count string",c);
 60                    return C_ERR;
 61                }
 62                break;
 63            }
 64
 65            /* Buffer should also contain \n */
 66            if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
 67                break;
 68
 69            if (c->querybuf[c->qb_pos] != '$') {
 70                addReplyErrorFormat(c,
 71                    "Protocol error: expected '$', got '%c'",
 72                    c->querybuf[c->qb_pos]);
 73                setProtocolError("expected $ but got something else",c);
 74                return C_ERR;
 75            }
 76
 77            ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
 78            if (!ok || ll < 0 ||
 79                (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
 80                addReplyError(c,"Protocol error: invalid bulk length");
 81                setProtocolError("invalid bulk length",c);
 82                return C_ERR;
 83            } else if (ll > 16384 && authRequired(c)) {
 84                addReplyError(c, "Protocol error: unauthenticated bulk length");
 85                setProtocolError("unauth bulk length", c);
 86                return C_ERR;
 87            }
 88
 89            c->qb_pos = newline-c->querybuf+2;
 90            if (ll >= PROTO_MBULK_BIG_ARG) {
 91                /* If we are going to read a large object from network
 92                 * try to make it likely that it will start at c->querybuf
 93                 * boundary so that we can optimize object creation
 94                 * avoiding a large copy of data.
 95                 *
 96                 * But only when the data we have not parsed is less than
 97                 * or equal to ll+2. If the data length is greater than
 98                 * ll+2, trimming querybuf is just a waste of time, because
 99                 * at this time the querybuf contains not only our bulk. */
100                if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
101                    sdsrange(c->querybuf,c->qb_pos,-1);
102                    c->qb_pos = 0;
103                    /* Hint the sds library about the amount of bytes this string is
104                     * going to contain. */
105                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
106                }
107            }
108            c->bulklen = ll;
109        }
110
111        /* Read bulk argument */
112        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
113            /* Not enough data (+2 == trailing \r\n) */
114            break;
115        } else {
116            /* Optimization: if the buffer contains JUST our bulk element
117             * instead of creating a new object by *copying* the sds we
118             * just use the current sds string. */
119            if (c->qb_pos == 0 &&
120                c->bulklen >= PROTO_MBULK_BIG_ARG &&
121                sdslen(c->querybuf) == (size_t)(c->bulklen+2))
122            {
123                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
124                c->argv_len_sum += c->bulklen;
125                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
126                /* Assume that if we saw a fat argument we'll see another one
127                 * likely... */
128                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
129                sdsclear(c->querybuf);
130            } else {
131                c->argv[c->argc++] =
132                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
133                c->argv_len_sum += c->bulklen;
134                c->qb_pos += c->bulklen+2;
135            }
136            c->bulklen = -1;
137            c->multibulklen--;
138        }
139    }
140
141    /* We're done when c->multibulk == 0 */
142    if (c->multibulklen == 0) return C_OK;
143
144    /* Still not ready to process the command */
145    return C_ERR;
146}

在研究代码之前我们先熟悉一下redis的多行协议格式

1*<参数数量> CR LF
2$<参数1 的字节数量> CR LF
3<参数1 的数据> CR LF
4...
5$<参数N 的字节数量> CR LF
6<参数N 的数据> CR LF

首先我们看到这行代码:

1    if (c->multibulklen == 0) {

multibulklen 在createClient函数中被初始化为了0,新的客户端连接一定会进入到这个分支中。之后

 1        /* Multi bulk length cannot be read without a \r\n */
 2        newline = strchr(c->querybuf+c->qb_pos,'\r');
 3        if (newline == NULL) {
 4            if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 5                addReplyError(c,"Protocol error: too big mbulk count string");
 6                setProtocolError("too big mbulk count string",c);
 7            }
 8            return C_ERR;
 9        }
10
11        /* Buffer should also contain \n */
12        if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
13            return C_ERR;

查找一行结束的标志,没有找到则报错并返回。找到新行则先判断querybuf是否能装下整个新行包括\r\n。

1        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);

这一行获取了查询命令第一个字符后面紧跟的数值,由此我们可以得知redis的多行请求格式的第一行为

*[count]\r\n

获取后会对string2ll的返回值及查询长度进行校验

1        if (!ok || ll > 1024*1024) {
2            addReplyError(c,"Protocol error: invalid multibulk length");
3            setProtocolError("invalid mbulk count",c);
4            return C_ERR;
5        } else if (ll > 10 && authRequired(c)) {
6            addReplyError(c, "Protocol error: unauthenticated multibulk length");
7            setProtocolError("unauth mbulk count", c);
8            return C_ERR;
9        }

由代码可以看到,多行查询指令的参数个数不能大于1048576,否则会返回“Protocol error: invalid multibulk length”错误。 authRequired(c)调用则是检查ACL进行的验证结果。

完成这些以后,后面的几行主要完成指令环境初始化的工作

1        c->qb_pos = (newline-c->querybuf)+2;

这里调整读指针,准备读取下一行数据。

1        if (ll <= 0) return C_OK;
2
3        c->multibulklen = ll;

将参数数量保存到multibulklen中待用。

1        /* Setup argv array on client structure */
2        if (c->argv) zfree(c->argv);
3        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
4        c->argv_len_sum = 0;

初始化argv用于保存参数

之后的代码用于解析协议中的参数

1    while(c->multibulklen) {
2        /* Read bulk length if unknown */
3        if (c->bulklen == -1) {

当解析刚开始的时候c->bulklen的值为-1,这也是在createClient中初始化的。bulklen表示当前解析参数的长度,这个分支就是用于填充bulklen的。

代码比较多我就不贴在这里了。知道逻辑以后对着代码看应该很容易理解,我只把最后的几行代码解释一下:

 1            if (ll >= PROTO_MBULK_BIG_ARG) {
 2                if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
 3                    sdsrange(c->querybuf,c->qb_pos,-1);
 4                    c->qb_pos = 0;
 5                    /* Hint the sds library about the amount of bytes this string is
 6                     * going to contain. */
 7                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
 8                }
 9            }
10            c->bulklen = ll;

redis在这里做了一个优化:如果发现参数的长度过大,且querybuf中待处理的数据小于或等于这个对象的大小,则将这个对象的起始位置移动到querybuf的头部。在后面装配argv的时候可以直接使用这部分内存。

bulklen初始化完成后后面就是获取各个argv参数的解析过程

1        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
2            /* Not enough data (+2 == trailing \r\n) */
3            break;
4        }

这里必须要再进行一次判断,因为保存在querybuf的数据包由可能是之前解析了一半或者第二次循环之后的,此时之前的bulklen == -1的条件不成立。

 1        else {
 2            /* Optimization: if the buffer contains JUST our bulk element
 3             * instead of creating a new object by *copying* the sds we
 4             * just use the current sds string. */
 5            if (c->qb_pos == 0 &&
 6                c->bulklen >= PROTO_MBULK_BIG_ARG &&
 7                sdslen(c->querybuf) == (size_t)(c->bulklen+2))
 8            {
 9                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
10                c->argv_len_sum += c->bulklen;
11                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
12                /* Assume that if we saw a fat argument we'll see another one
13                 * likely... */
14                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
15                sdsclear(c->querybuf);
16            } else {
17                c->argv[c->argc++] =
18                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
19                c->argv_len_sum += c->bulklen;
20                c->qb_pos += c->bulklen+2;
21            }
22            c->bulklen = -1;
23            c->multibulklen--;
24        }

上面代码第一个分支内就是之前说的优化的地方,可以看到createObject将c->querybuf作为robj的初始化参数调用,内部会直接使用这个指针作为robj的值,而不会分配新的内存。之后为querybuf重新生成了sdsstring

否则,使用createStringObject创建robj对象并赋值给argv

每一个参数被解析完后会对multibulklen进行减一操作,直到所有参数被解析完后送入命令处理入口进行处理。

无论是processInlineBuffer还是processMultibulkBuffer,只要是返回C_ERR都会跳出processInputBuffer的处理。

至此redis网络数据的处理就讲完了,redis这部分的逻辑还是比较简单的,我这里做个总结。

  1. 在createClient中通过connSetReadHandler(conn, readQueryFromClient)设置数据处理函数。
  2. 数据到达时由redis的网络层调用readQueryFromClient
  3. 使用connRead(c->conn, c->querybuf+qblen, readlen)读取已到达的数据并存入到querybuf中
  4. 数据读完后调用processInputBUffer对已读数据进行处理。
  5. 对querybuf进行解析,根据第一个字符是否为'*' 确定指令的类型
    • 单行:调用processInlineBuffer处理
    • 多行:调用processMultibulkBuffer处理

redis 多行协议格式:

1*<参数数量> CR LF
2$<参数1 的字节数量> CR LF
3<参数1 的数据> CR LF
4...
5$<参数N 的字节数量> CR LF
6<参数N 的数据> CR LF

下一篇我们在来分析redis如何发送网络数据。

评论