


1    if (conn) {
2        connNonBlock(conn);
3        connEnableTcpNoDelay(conn);
4        if (server.tcpkeepalive)
5            connKeepAlive(conn,server.tcpkeepalive);
6        connSetReadHandler(conn, readQueryFromClient);
7        connSetPrivateData(conn, c);
8    }



 1void readQueryFromClient(connection *conn) {
 2    client *c = connGetPrivateData(conn);
 3    int nread, readlen;
 4    size_t qblen;
 6    /* Check if we want to read from the client later when exiting from
 7     * the event loop. This is the case if threaded I/O is enabled. */
 8    if (postponeClientRead(c)) return;
10    /* Update total number of reads on server */
11    atomicIncr(server.stat_total_reads_processed, 1);
13    readlen = PROTO_IOBUF_LEN;
14    /* If this is a multi bulk request, and we are processing a bulk reply
15     * that is large enough, try to maximize the probability that the query
16     * buffer contains exactly the SDS string representing the object, even
17     * at the risk of requiring more read(2) calls. This way the function
18     * processMultiBulkBuffer() can avoid copying buffers to create the
19     * Redis Object representing the argument. */
20    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
21        && c->bulklen >= PROTO_MBULK_BIG_ARG)
22    {
23        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
25        /* Note that the 'remaining' variable may be zero in some edge case,
26         * for example once we resume a blocked client after CLIENT PAUSE. */
27        if (remaining > 0 && remaining < readlen) readlen = remaining;
28    }
30    qblen = sdslen(c->querybuf);
31    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
32    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
33    nread = connRead(c->conn, c->querybuf+qblen, readlen);
34    if (nread == -1) {
35        if (connGetState(conn) == CONN_STATE_CONNECTED) {
36            return;
37        } else {
38            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
39            freeClientAsync(c);
40            return;
41        }
42    } else if (nread == 0) {
43        serverLog(LL_VERBOSE, "Client closed connection");
44        freeClientAsync(c);
45        return;
46    } else if (c->flags & CLIENT_MASTER) {
47        /* Append the query buffer to the pending (not applied) buffer
48         * of the master. We'll use this buffer later in order to have a
49         * copy of the string applied by the last command executed. */
50        c->pending_querybuf = sdscatlen(c->pending_querybuf,
51                                        c->querybuf+qblen,nread);
52    }
54    sdsIncrLen(c->querybuf,nread);
55    c->lastinteraction = server.unixtime;
56    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
57    atomicIncr(server.stat_net_input_bytes, nread);
58    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
59        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
61        bytes = sdscatrepr(bytes,c->querybuf,64);
62        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
63        sdsfree(ci);
64        sdsfree(bytes);
65        freeClientAsync(c);
66        return;
67    }
69    /* There is more data in the client input buffer, continue parsing it
70     * in case to check if there is a full command to execute. */
71     processInputBuffer(c);


 1int postponeClientRead(client *c) {
 2    if (server.io_threads_active &&
 3        server.io_threads_do_reads &&
 4        !ProcessingEventsWhileBlocked &&
 6    {
 7        c->flags |= CLIENT_PENDING_READ;
 8        listAddNodeHead(server.clients_pending_read,c);
 9        return 1;
10    } else {
11        return 0;
12    }


  • io_threads_active 记录被激活的IO线程数量
  • io-threads-do-reads 这是一个配置项,指示是否使用io线程来读取和解析redis命令
  • ProcessingEventsWhileBlocked 这个标记用来判断是否在一个长阻塞的情况下
    • CLIENT_MASTER: 主从模式下由Master创建的客户端连接
    • CLIENT_SLAVE: 主从模式下由Slave创建的客户端连接
    • CLIENT_PENDING_READ: 客户端已经被标记为异步读的情况下
    • CLIENT_BLOCKED: 客户端是否处于阻塞模式,一旦处于阻塞模式,客户端将不再接受该客户端的命令。由多种情况会让客户端处于阻塞模式:
      • 由Redis模块发起的阻塞
      • 由wait指令发起的阻塞
      • 由服务器暂停引发的阻塞 判定成功后会将client加入到server.clients_pending_read链表中等待IO线程处理。


  • client
    • reqtype 标记请求的类型
      • PROTO_REQ_INLINE 单行指令
    • querybuf 累积的指令缓冲
    • qb_pos 已读取过的缓冲位置


  1. redis 先获取已读入的缓冲长度并保存在qblen中
  2. 将querybuf的可用空间扩展到readlen所给的大小
  3. 使用connRead将数据读入querybuf中
  4. 读成功后更新缓冲区长度
  5. 更新redis统计数据
  6. 将已读缓冲送入processInputBuffer中处理。



 1void processInputBuffer(client *c) {
 2    /* Keep processing while there is something in the input buffer */
 3    while(c->qb_pos < sdslen(c->querybuf)) {
 4        /* Immediately abort if the client is in the middle of something. */
 5        if (c->flags & CLIENT_BLOCKED) break;
 7        /* Don't process more buffers from clients that have already pending
 8         * commands to execute in c->argv. */
 9        if (c->flags & CLIENT_PENDING_COMMAND) break;
11        /* Don't process input from the master while there is a busy script
12         * condition on the slave. We want just to accumulate the replication
13         * stream (instead of replying -BUSY like we do with other clients) and
14         * later resume the processing. */
15        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
17        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
18         * written to the client. Make sure to not let the reply grow after
19         * this flag has been set (i.e. don't process more commands).
20         *
21         * The same applies for clients we want to terminate ASAP. */
22        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
24        /* Determine request type when unknown. */
25        if (!c->reqtype) {
26            if (c->querybuf[c->qb_pos] == '*') {
27                c->reqtype = PROTO_REQ_MULTIBULK;
28            } else {
29                c->reqtype = PROTO_REQ_INLINE;
30            }
31        }
33        if (c->reqtype == PROTO_REQ_INLINE) {
34            if (processInlineBuffer(c) != C_OK) break;
35            /* If the Gopher mode and we got zero or one argument, process
36             * the request in Gopher mode. To avoid data race, Redis won't
37             * support Gopher if enable io threads to read queries. */
38            if (server.gopher_enabled && !server.io_threads_do_reads &&
39                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
40                  c->argc == 0))
41            {
42                processGopherRequest(c);
43                resetClient(c);
44                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
45                break;
46            }
47        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
48            if (processMultibulkBuffer(c) != C_OK) break;
49        } else {
50            serverPanic("Unknown request type");
51        }
53        /* Multibulk processing could see a <= 0 length. */
54        if (c->argc == 0) {
55            resetClient(c);
56        } else {
57            /* If we are in the context of an I/O thread, we can't really
58             * execute the command here. All we can do is to flag the client
59             * as one that needs to process the command. */
60            if (c->flags & CLIENT_PENDING_READ) {
61                c->flags |= CLIENT_PENDING_COMMAND;
62                break;
63            }
65            /* We are finally ready to execute the command. */
66            if (processCommandAndResetClient(c) == C_ERR) {
67                /* If the client is no longer valid, we avoid exiting this
68                 * loop and trimming the client buffer later. So we return
69                 * ASAP in that case. */
70                return;
71            }
72        }
73    }
75    /* Trim to pos */
76    if (c->qb_pos) {
77        sdsrange(c->querybuf,c->qb_pos,-1);
78        c->qb_pos = 0;
79    }


 1int processInlineBuffer(client *c) {
 2    char *newline;
 3    int argc, j, linefeed_chars = 1;
 4    sds *argv, aux;
 5    size_t querylen;
 7    /* Search for end of line */
 8    newline = strchr(c->querybuf+c->qb_pos,'\n');
10    /* Nothing to do without a \r\n */
11    if (newline == NULL) {
12        if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
13            addReplyError(c,"Protocol error: too big inline request");
14            setProtocolError("too big inline request",c);
15        }
16        return C_ERR;
17    }
19    /* Handle the \r\n case. */
20    if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
21        newline--, linefeed_chars++;
23    /* Split the input buffer up to the \r\n */
24    querylen = newline-(c->querybuf+c->qb_pos);
25    aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
26    argv = sdssplitargs(aux,&argc);
27    sdsfree(aux);
28    if (argv == NULL) {
29        addReplyError(c,"Protocol error: unbalanced quotes in request");
30        setProtocolError("unbalanced quotes in inline request",c);
31        return C_ERR;
32    }
34    /* Newline from slaves can be used to refresh the last ACK time.
35     * This is useful for a slave to ping back while loading a big
36     * RDB file. */
37    if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
38        c->repl_ack_time = server.unixtime;
40    /* Masters should never send us inline protocol to run actual
41     * commands. If this happens, it is likely due to a bug in Redis where
42     * we got some desynchronization in the protocol, for example
43     * beause of a PSYNC gone bad.
44     *
45     * However the is an exception: masters may send us just a newline
46     * to keep the connection active. */
47    if (querylen != 0 && c->flags & CLIENT_MASTER) {
48        sdsfreesplitres(argv,argc);
49        serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
50        setProtocolError("Master using the inline protocol. Desync?",c);
51        return C_ERR;
52    }
54    /* Move querybuffer position to the next query in the buffer. */
55    c->qb_pos += querylen+linefeed_chars;
57    /* Setup argv array on client structure */
58    if (argc) {
59        if (c->argv) zfree(c->argv);
60        c->argv = zmalloc(sizeof(robj*)*argc);
61        c->argv_len_sum = 0;
62    }
64    /* Create redis objects for all arguments. */
65    for (c->argc = 0, j = 0; j < argc; j++) {
66        c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
67        c->argc++;
68        c->argv_len_sum += sdslen(argv[j]);
69    }
70    zfree(argv);
71    return C_OK;


  1. 使用strchr查找'\n’字符

    1newline = strchr(c->querybuf+c->qb_pos,'\n');
  2. 修正新行位置,redis使用\r\n来作为行结束标志

    1if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
    2    newline--, linefeed_chars++;
  3. 将行使用sdssplitargs分割为参数列表并保存在argv数组中

    1querylen = newline-(c->querybuf+c->qb_pos);
    2aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
    3argv = sdssplitargs(aux,&argc);
  4. 调整已读指针

    1c->qb_pos += querylen+linefeed_chars;
  5. 将参数数组argv复制到client::argv中,这里之所以没有直接用argv中的数据是因为client::argv的类型并非sds而是robj

     1    /* Setup argv array on client structure */
     2if (argc) {
     3    if (c->argv) zfree(c->argv);
     4    c->argv = zmalloc(sizeof(robj*)*argc);
     5    c->argv_len_sum = 0;
     8/* Create redis objects for all arguments. */
     9for (c->argc = 0, j = 0; j < argc; j++) {
    10    c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
    11    c->argc++;
    12    c->argv_len_sum += sdslen(argv[j]);



  1int processMultibulkBuffer(client *c) {
  2    char *newline = NULL;
  3    int ok;
  4    long long ll;
  6    if (c->multibulklen == 0) {
  7        /* The client should have been reset */
  8        serverAssertWithInfo(c,NULL,c->argc == 0);
 10        /* Multi bulk length cannot be read without a \r\n */
 11        newline = strchr(c->querybuf+c->qb_pos,'\r');
 12        if (newline == NULL) {
 13            if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 14                addReplyError(c,"Protocol error: too big mbulk count string");
 15                setProtocolError("too big mbulk count string",c);
 16            }
 17            return C_ERR;
 18        }
 20        /* Buffer should also contain \n */
 21        if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
 22            return C_ERR;
 24        /* We know for sure there is a whole line since newline != NULL,
 25         * so go ahead and find out the multi bulk length. */
 26        serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
 27        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
 28        if (!ok || ll > 1024*1024) {
 29            addReplyError(c,"Protocol error: invalid multibulk length");
 30            setProtocolError("invalid mbulk count",c);
 31            return C_ERR;
 32        } else if (ll > 10 && authRequired(c)) {
 33            addReplyError(c, "Protocol error: unauthenticated multibulk length");
 34            setProtocolError("unauth mbulk count", c);
 35            return C_ERR;
 36        }
 38        c->qb_pos = (newline-c->querybuf)+2;
 40        if (ll <= 0) return C_OK;
 42        c->multibulklen = ll;
 44        /* Setup argv array on client structure */
 45        if (c->argv) zfree(c->argv);
 46        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
 47        c->argv_len_sum = 0;
 48    }
 50    serverAssertWithInfo(c,NULL,c->multibulklen > 0);
 51    while(c->multibulklen) {
 52        /* Read bulk length if unknown */
 53        if (c->bulklen == -1) {
 54            newline = strchr(c->querybuf+c->qb_pos,'\r');
 55            if (newline == NULL) {
 56                if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 57                    addReplyError(c,
 58                        "Protocol error: too big bulk count string");
 59                    setProtocolError("too big bulk count string",c);
 60                    return C_ERR;
 61                }
 62                break;
 63            }
 65            /* Buffer should also contain \n */
 66            if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
 67                break;
 69            if (c->querybuf[c->qb_pos] != '$') {
 70                addReplyErrorFormat(c,
 71                    "Protocol error: expected '$', got '%c'",
 72                    c->querybuf[c->qb_pos]);
 73                setProtocolError("expected $ but got something else",c);
 74                return C_ERR;
 75            }
 77            ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
 78            if (!ok || ll < 0 ||
 79                (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
 80                addReplyError(c,"Protocol error: invalid bulk length");
 81                setProtocolError("invalid bulk length",c);
 82                return C_ERR;
 83            } else if (ll > 16384 && authRequired(c)) {
 84                addReplyError(c, "Protocol error: unauthenticated bulk length");
 85                setProtocolError("unauth bulk length", c);
 86                return C_ERR;
 87            }
 89            c->qb_pos = newline-c->querybuf+2;
 90            if (ll >= PROTO_MBULK_BIG_ARG) {
 91                /* If we are going to read a large object from network
 92                 * try to make it likely that it will start at c->querybuf
 93                 * boundary so that we can optimize object creation
 94                 * avoiding a large copy of data.
 95                 *
 96                 * But only when the data we have not parsed is less than
 97                 * or equal to ll+2. If the data length is greater than
 98                 * ll+2, trimming querybuf is just a waste of time, because
 99                 * at this time the querybuf contains not only our bulk. */
100                if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
101                    sdsrange(c->querybuf,c->qb_pos,-1);
102                    c->qb_pos = 0;
103                    /* Hint the sds library about the amount of bytes this string is
104                     * going to contain. */
105                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
106                }
107            }
108            c->bulklen = ll;
109        }
111        /* Read bulk argument */
112        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
113            /* Not enough data (+2 == trailing \r\n) */
114            break;
115        } else {
116            /* Optimization: if the buffer contains JUST our bulk element
117             * instead of creating a new object by *copying* the sds we
118             * just use the current sds string. */
119            if (c->qb_pos == 0 &&
120                c->bulklen >= PROTO_MBULK_BIG_ARG &&
121                sdslen(c->querybuf) == (size_t)(c->bulklen+2))
122            {
123                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
124                c->argv_len_sum += c->bulklen;
125                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
126                /* Assume that if we saw a fat argument we'll see another one
127                 * likely... */
128                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
129                sdsclear(c->querybuf);
130            } else {
131                c->argv[c->argc++] =
132                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
133                c->argv_len_sum += c->bulklen;
134                c->qb_pos += c->bulklen+2;
135            }
136            c->bulklen = -1;
137            c->multibulklen--;
138        }
139    }
141    /* We're done when c->multibulk == 0 */
142    if (c->multibulklen == 0) return C_OK;
144    /* Still not ready to process the command */
145    return C_ERR;


1    if (c->multibulklen == 0) {

multibulklen 在createClient函数中被初始化为了0,新的客户端连接一定会进入到这个分支中。之后

 1        /* Multi bulk length cannot be read without a \r\n */
 2        newline = strchr(c->querybuf+c->qb_pos,'\r');
 3        if (newline == NULL) {
 4            if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 5                addReplyError(c,"Protocol error: too big mbulk count string");
 6                setProtocolError("too big mbulk count string",c);
 7            }
 8            return C_ERR;
 9        }
11        /* Buffer should also contain \n */
12        if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
13            return C_ERR;


1        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);




1        if (!ok || ll > 1024*1024) {
2            addReplyError(c,"Protocol error: invalid multibulk length");
3            setProtocolError("invalid mbulk count",c);
4            return C_ERR;
5        } else if (ll > 10 && authRequired(c)) {
6            addReplyError(c, "Protocol error: unauthenticated multibulk length");
7            setProtocolError("unauth mbulk count", c);
8            return C_ERR;
9        }

由代码可以看到,多行查询指令的参数个数不能大于1048576,否则会返回“Protocol error: invalid multibulk length”错误。 authRequired(c)调用则是检查ACL进行的验证结果。


1        c->qb_pos = (newline-c->querybuf)+2;


1        if (ll <= 0) return C_OK;
3        c->multibulklen = ll;


1        /* Setup argv array on client structure */
2        if (c->argv) zfree(c->argv);
3        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
4        c->argv_len_sum = 0;



1    while(c->multibulklen) {
2        /* Read bulk length if unknown */
3        if (c->bulklen == -1) {



 1            if (ll >= PROTO_MBULK_BIG_ARG) {
 2                if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
 3                    sdsrange(c->querybuf,c->qb_pos,-1);
 4                    c->qb_pos = 0;
 5                    /* Hint the sds library about the amount of bytes this string is
 6                     * going to contain. */
 7                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
 8                }
 9            }
10            c->bulklen = ll;



1        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
2            /* Not enough data (+2 == trailing \r\n) */
3            break;
4        }

这里必须要再进行一次判断,因为保存在querybuf的数据包由可能是之前解析了一半或者第二次循环之后的,此时之前的bulklen == -1的条件不成立。

 1        else {
 2            /* Optimization: if the buffer contains JUST our bulk element
 3             * instead of creating a new object by *copying* the sds we
 4             * just use the current sds string. */
 5            if (c->qb_pos == 0 &&
 6                c->bulklen >= PROTO_MBULK_BIG_ARG &&
 7                sdslen(c->querybuf) == (size_t)(c->bulklen+2))
 8            {
 9                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
10                c->argv_len_sum += c->bulklen;
11                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
12                /* Assume that if we saw a fat argument we'll see another one
13                 * likely... */
14                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
15                sdsclear(c->querybuf);
16            } else {
17                c->argv[c->argc++] =
18                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
19                c->argv_len_sum += c->bulklen;
20                c->qb_pos += c->bulklen+2;
21            }
22            c->bulklen = -1;
23            c->multibulklen--;
24        }






  1. 在createClient中通过connSetReadHandler(conn, readQueryFromClient)设置数据处理函数。
  2. 数据到达时由redis的网络层调用readQueryFromClient
  3. 使用connRead(c->conn, c->querybuf+qblen, readlen)读取已到达的数据并存入到querybuf中
  4. 数据读完后调用processInputBUffer对已读数据进行处理。
  5. 对querybuf进行解析,根据第一个字符是否为'*' 确定指令的类型
    • 单行:调用processInlineBuffer处理
    • 多行:调用processMultibulkBuffer处理

redis 多行协议格式:

