当网络连接准备好之后redis就会开始等待接收网络数据并处理。
还记得我们在分析createClient这个函数的时候,当conn不为空时的处理吗?
1 if (conn) {
2 connNonBlock(conn);
3 connEnableTcpNoDelay(conn);
4 if (server.tcpkeepalive)
5 connKeepAlive(conn,server.tcpkeepalive);
6 connSetReadHandler(conn, readQueryFromClient);
7 connSetPrivateData(conn, c);
8 }
这里client通过调用connSetReadHandler为数据处理设置了一个回调函数readQueryFromClient,这个函数就是客户端处理网络数据的入口。
readQueryFromClient
1void readQueryFromClient(connection *conn) {
2 client *c = connGetPrivateData(conn);
3 int nread, readlen;
4 size_t qblen;
5
6 /* Check if we want to read from the client later when exiting from
7 * the event loop. This is the case if threaded I/O is enabled. */
8 if (postponeClientRead(c)) return;
9
10 /* Update total number of reads on server */
11 atomicIncr(server.stat_total_reads_processed, 1);
12
13 readlen = PROTO_IOBUF_LEN;
14 /* If this is a multi bulk request, and we are processing a bulk reply
15 * that is large enough, try to maximize the probability that the query
16 * buffer contains exactly the SDS string representing the object, even
17 * at the risk of requiring more read(2) calls. This way the function
18 * processMultiBulkBuffer() can avoid copying buffers to create the
19 * Redis Object representing the argument. */
20 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
21 && c->bulklen >= PROTO_MBULK_BIG_ARG)
22 {
23 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
24
25 /* Note that the 'remaining' variable may be zero in some edge case,
26 * for example once we resume a blocked client after CLIENT PAUSE. */
27 if (remaining > 0 && remaining < readlen) readlen = remaining;
28 }
29
30 qblen = sdslen(c->querybuf);
31 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
32 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
33 nread = connRead(c->conn, c->querybuf+qblen, readlen);
34 if (nread == -1) {
35 if (connGetState(conn) == CONN_STATE_CONNECTED) {
36 return;
37 } else {
38 serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
39 freeClientAsync(c);
40 return;
41 }
42 } else if (nread == 0) {
43 serverLog(LL_VERBOSE, "Client closed connection");
44 freeClientAsync(c);
45 return;
46 } else if (c->flags & CLIENT_MASTER) {
47 /* Append the query buffer to the pending (not applied) buffer
48 * of the master. We'll use this buffer later in order to have a
49 * copy of the string applied by the last command executed. */
50 c->pending_querybuf = sdscatlen(c->pending_querybuf,
51 c->querybuf+qblen,nread);
52 }
53
54 sdsIncrLen(c->querybuf,nread);
55 c->lastinteraction = server.unixtime;
56 if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
57 atomicIncr(server.stat_net_input_bytes, nread);
58 if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
59 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
60
61 bytes = sdscatrepr(bytes,c->querybuf,64);
62 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
63 sdsfree(ci);
64 sdsfree(bytes);
65 freeClientAsync(c);
66 return;
67 }
68
69 /* There is more data in the client input buffer, continue parsing it
70 * in case to check if there is a full command to execute. */
71 processInputBuffer(c);
72}
这个函数一开头就碰到这样一条语句
1if (postponeClientRead(c)) return;
这个函数会判断客户端是否开启并满足异步读的条件,满足返回1否则返回0
postponeClinetRead
1int postponeClientRead(client *c) {
2 if (server.io_threads_active &&
3 server.io_threads_do_reads &&
4 !ProcessingEventsWhileBlocked &&
5 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
6 {
7 c->flags |= CLIENT_PENDING_READ;
8 listAddNodeHead(server.clients_pending_read,c);
9 return 1;
10 } else {
11 return 0;
12 }
13}
这个判定的含义如下:
- io_threads_active 记录被激活的IO线程数量
- io-threads-do-reads 这是一个配置项,指示是否使用io线程来读取和解析redis命令
- ProcessingEventsWhileBlocked 这个标记用来判断是否在一个长阻塞的情况下
- c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED) 要求客户端不存在这些标记
- CLIENT_MASTER: 主从模式下由Master创建的客户端连接
- CLIENT_SLAVE: 主从模式下由Slave创建的客户端连接
- CLIENT_PENDING_READ: 客户端已经被标记为异步读的情况下
- CLIENT_BLOCKED: 客户端是否处于阻塞模式,一旦处于阻塞模式,客户端将不再接受该客户端的命令。由多种情况会让客户端处于阻塞模式:
- 由Redis模块发起的阻塞
- 由wait指令发起的阻塞
- 由服务器暂停引发的阻塞 判定成功后会将client加入到server.clients_pending_read链表中等待IO线程处理。
在分析client读取数据前我们先来看以下几个和读缓冲相关的client字段:
- client
- reqtype 标记请求的类型
- PROTO_REQ_INLINE 单行指令
- PROTO_REQ_MULTIBULK 多行指令
- querybuf 累积的指令缓冲
- qb_pos 已读取过的缓冲位置
- reqtype 标记请求的类型
读取数据的过程如下:
- redis 先获取已读入的缓冲长度并保存在qblen中
- 将querybuf的可用空间扩展到readlen所给的大小
- 使用connRead将数据读入querybuf中
- 读成功后更新缓冲区长度
- 更新redis统计数据
- 将已读缓冲送入processInputBuffer中处理。
processInputBuffer
这个函数解析redis协议
1void processInputBuffer(client *c) {
2 /* Keep processing while there is something in the input buffer */
3 while(c->qb_pos < sdslen(c->querybuf)) {
4 /* Immediately abort if the client is in the middle of something. */
5 if (c->flags & CLIENT_BLOCKED) break;
6
7 /* Don't process more buffers from clients that have already pending
8 * commands to execute in c->argv. */
9 if (c->flags & CLIENT_PENDING_COMMAND) break;
10
11 /* Don't process input from the master while there is a busy script
12 * condition on the slave. We want just to accumulate the replication
13 * stream (instead of replying -BUSY like we do with other clients) and
14 * later resume the processing. */
15 if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
16
17 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
18 * written to the client. Make sure to not let the reply grow after
19 * this flag has been set (i.e. don't process more commands).
20 *
21 * The same applies for clients we want to terminate ASAP. */
22 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
23
24 /* Determine request type when unknown. */
25 if (!c->reqtype) {
26 if (c->querybuf[c->qb_pos] == '*') {
27 c->reqtype = PROTO_REQ_MULTIBULK;
28 } else {
29 c->reqtype = PROTO_REQ_INLINE;
30 }
31 }
32
33 if (c->reqtype == PROTO_REQ_INLINE) {
34 if (processInlineBuffer(c) != C_OK) break;
35 /* If the Gopher mode and we got zero or one argument, process
36 * the request in Gopher mode. To avoid data race, Redis won't
37 * support Gopher if enable io threads to read queries. */
38 if (server.gopher_enabled && !server.io_threads_do_reads &&
39 ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
40 c->argc == 0))
41 {
42 processGopherRequest(c);
43 resetClient(c);
44 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
45 break;
46 }
47 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
48 if (processMultibulkBuffer(c) != C_OK) break;
49 } else {
50 serverPanic("Unknown request type");
51 }
52
53 /* Multibulk processing could see a <= 0 length. */
54 if (c->argc == 0) {
55 resetClient(c);
56 } else {
57 /* If we are in the context of an I/O thread, we can't really
58 * execute the command here. All we can do is to flag the client
59 * as one that needs to process the command. */
60 if (c->flags & CLIENT_PENDING_READ) {
61 c->flags |= CLIENT_PENDING_COMMAND;
62 break;
63 }
64
65 /* We are finally ready to execute the command. */
66 if (processCommandAndResetClient(c) == C_ERR) {
67 /* If the client is no longer valid, we avoid exiting this
68 * loop and trimming the client buffer later. So we return
69 * ASAP in that case. */
70 return;
71 }
72 }
73 }
74
75 /* Trim to pos */
76 if (c->qb_pos) {
77 sdsrange(c->querybuf,c->qb_pos,-1);
78 c->qb_pos = 0;
79 }
80}
由这里我们可以看到Redis协议如何区分两种请求类型
1/* Determine request type when unknown. */
2if (!c->reqtype) {
3 if (c->querybuf[c->qb_pos] == '*') {
4 c->reqtype = PROTO_REQ_MULTIBULK;
5 } else {
6 c->reqtype = PROTO_REQ_INLINE;
7 }
8}
第一个字符为*则是多行协议,否则是单行协议。单行协议由processInlineBuffer处理,多行协议由processMultibulkBuffer处理。
processInlineBuffer
单行协议的处理要简单一些,我们先来看下代码。
1int processInlineBuffer(client *c) {
2 char *newline;
3 int argc, j, linefeed_chars = 1;
4 sds *argv, aux;
5 size_t querylen;
6
7 /* Search for end of line */
8 newline = strchr(c->querybuf+c->qb_pos,'\n');
9
10 /* Nothing to do without a \r\n */
11 if (newline == NULL) {
12 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
13 addReplyError(c,"Protocol error: too big inline request");
14 setProtocolError("too big inline request",c);
15 }
16 return C_ERR;
17 }
18
19 /* Handle the \r\n case. */
20 if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
21 newline--, linefeed_chars++;
22
23 /* Split the input buffer up to the \r\n */
24 querylen = newline-(c->querybuf+c->qb_pos);
25 aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
26 argv = sdssplitargs(aux,&argc);
27 sdsfree(aux);
28 if (argv == NULL) {
29 addReplyError(c,"Protocol error: unbalanced quotes in request");
30 setProtocolError("unbalanced quotes in inline request",c);
31 return C_ERR;
32 }
33
34 /* Newline from slaves can be used to refresh the last ACK time.
35 * This is useful for a slave to ping back while loading a big
36 * RDB file. */
37 if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
38 c->repl_ack_time = server.unixtime;
39
40 /* Masters should never send us inline protocol to run actual
41 * commands. If this happens, it is likely due to a bug in Redis where
42 * we got some desynchronization in the protocol, for example
43 * beause of a PSYNC gone bad.
44 *
45 * However the is an exception: masters may send us just a newline
46 * to keep the connection active. */
47 if (querylen != 0 && c->flags & CLIENT_MASTER) {
48 sdsfreesplitres(argv,argc);
49 serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
50 setProtocolError("Master using the inline protocol. Desync?",c);
51 return C_ERR;
52 }
53
54 /* Move querybuffer position to the next query in the buffer. */
55 c->qb_pos += querylen+linefeed_chars;
56
57 /* Setup argv array on client structure */
58 if (argc) {
59 if (c->argv) zfree(c->argv);
60 c->argv = zmalloc(sizeof(robj*)*argc);
61 c->argv_len_sum = 0;
62 }
63
64 /* Create redis objects for all arguments. */
65 for (c->argc = 0, j = 0; j < argc; j++) {
66 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
67 c->argc++;
68 c->argv_len_sum += sdslen(argv[j]);
69 }
70 zfree(argv);
71 return C_OK;
72}
大体的逻辑如下:
-
使用strchr查找'\n’字符
1newline = strchr(c->querybuf+c->qb_pos,'\n');
-
修正新行位置,redis使用\r\n来作为行结束标志
1if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') 2 newline--, linefeed_chars++;
-
将行使用sdssplitargs分割为参数列表并保存在argv数组中
1querylen = newline-(c->querybuf+c->qb_pos); 2aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); 3argv = sdssplitargs(aux,&argc);
-
调整已读指针
1c->qb_pos += querylen+linefeed_chars;
-
将参数数组argv复制到client::argv中,这里之所以没有直接用argv中的数据是因为client::argv的类型并非sds而是robj
1 /* Setup argv array on client structure */ 2if (argc) { 3 if (c->argv) zfree(c->argv); 4 c->argv = zmalloc(sizeof(robj*)*argc); 5 c->argv_len_sum = 0; 6} 7 8/* Create redis objects for all arguments. */ 9for (c->argc = 0, j = 0; j < argc; j++) { 10 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); 11 c->argc++; 12 c->argv_len_sum += sdslen(argv[j]); 13}
processMultibulkBuffer
这个函数比较长,我大概捋了一下整个流程。
1int processMultibulkBuffer(client *c) {
2 char *newline = NULL;
3 int ok;
4 long long ll;
5
6 if (c->multibulklen == 0) {
7 /* The client should have been reset */
8 serverAssertWithInfo(c,NULL,c->argc == 0);
9
10 /* Multi bulk length cannot be read without a \r\n */
11 newline = strchr(c->querybuf+c->qb_pos,'\r');
12 if (newline == NULL) {
13 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
14 addReplyError(c,"Protocol error: too big mbulk count string");
15 setProtocolError("too big mbulk count string",c);
16 }
17 return C_ERR;
18 }
19
20 /* Buffer should also contain \n */
21 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
22 return C_ERR;
23
24 /* We know for sure there is a whole line since newline != NULL,
25 * so go ahead and find out the multi bulk length. */
26 serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
27 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
28 if (!ok || ll > 1024*1024) {
29 addReplyError(c,"Protocol error: invalid multibulk length");
30 setProtocolError("invalid mbulk count",c);
31 return C_ERR;
32 } else if (ll > 10 && authRequired(c)) {
33 addReplyError(c, "Protocol error: unauthenticated multibulk length");
34 setProtocolError("unauth mbulk count", c);
35 return C_ERR;
36 }
37
38 c->qb_pos = (newline-c->querybuf)+2;
39
40 if (ll <= 0) return C_OK;
41
42 c->multibulklen = ll;
43
44 /* Setup argv array on client structure */
45 if (c->argv) zfree(c->argv);
46 c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
47 c->argv_len_sum = 0;
48 }
49
50 serverAssertWithInfo(c,NULL,c->multibulklen > 0);
51 while(c->multibulklen) {
52 /* Read bulk length if unknown */
53 if (c->bulklen == -1) {
54 newline = strchr(c->querybuf+c->qb_pos,'\r');
55 if (newline == NULL) {
56 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
57 addReplyError(c,
58 "Protocol error: too big bulk count string");
59 setProtocolError("too big bulk count string",c);
60 return C_ERR;
61 }
62 break;
63 }
64
65 /* Buffer should also contain \n */
66 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
67 break;
68
69 if (c->querybuf[c->qb_pos] != '$') {
70 addReplyErrorFormat(c,
71 "Protocol error: expected '$', got '%c'",
72 c->querybuf[c->qb_pos]);
73 setProtocolError("expected $ but got something else",c);
74 return C_ERR;
75 }
76
77 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
78 if (!ok || ll < 0 ||
79 (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
80 addReplyError(c,"Protocol error: invalid bulk length");
81 setProtocolError("invalid bulk length",c);
82 return C_ERR;
83 } else if (ll > 16384 && authRequired(c)) {
84 addReplyError(c, "Protocol error: unauthenticated bulk length");
85 setProtocolError("unauth bulk length", c);
86 return C_ERR;
87 }
88
89 c->qb_pos = newline-c->querybuf+2;
90 if (ll >= PROTO_MBULK_BIG_ARG) {
91 /* If we are going to read a large object from network
92 * try to make it likely that it will start at c->querybuf
93 * boundary so that we can optimize object creation
94 * avoiding a large copy of data.
95 *
96 * But only when the data we have not parsed is less than
97 * or equal to ll+2. If the data length is greater than
98 * ll+2, trimming querybuf is just a waste of time, because
99 * at this time the querybuf contains not only our bulk. */
100 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
101 sdsrange(c->querybuf,c->qb_pos,-1);
102 c->qb_pos = 0;
103 /* Hint the sds library about the amount of bytes this string is
104 * going to contain. */
105 c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
106 }
107 }
108 c->bulklen = ll;
109 }
110
111 /* Read bulk argument */
112 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
113 /* Not enough data (+2 == trailing \r\n) */
114 break;
115 } else {
116 /* Optimization: if the buffer contains JUST our bulk element
117 * instead of creating a new object by *copying* the sds we
118 * just use the current sds string. */
119 if (c->qb_pos == 0 &&
120 c->bulklen >= PROTO_MBULK_BIG_ARG &&
121 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
122 {
123 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
124 c->argv_len_sum += c->bulklen;
125 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
126 /* Assume that if we saw a fat argument we'll see another one
127 * likely... */
128 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
129 sdsclear(c->querybuf);
130 } else {
131 c->argv[c->argc++] =
132 createStringObject(c->querybuf+c->qb_pos,c->bulklen);
133 c->argv_len_sum += c->bulklen;
134 c->qb_pos += c->bulklen+2;
135 }
136 c->bulklen = -1;
137 c->multibulklen--;
138 }
139 }
140
141 /* We're done when c->multibulk == 0 */
142 if (c->multibulklen == 0) return C_OK;
143
144 /* Still not ready to process the command */
145 return C_ERR;
146}
在研究代码之前我们先熟悉一下redis的多行协议格式
1*<参数数量> CR LF
2$<参数1 的字节数量> CR LF
3<参数1 的数据> CR LF
4...
5$<参数N 的字节数量> CR LF
6<参数N 的数据> CR LF
首先我们看到这行代码:
1 if (c->multibulklen == 0) {
multibulklen 在createClient函数中被初始化为了0,新的客户端连接一定会进入到这个分支中。之后
1 /* Multi bulk length cannot be read without a \r\n */
2 newline = strchr(c->querybuf+c->qb_pos,'\r');
3 if (newline == NULL) {
4 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
5 addReplyError(c,"Protocol error: too big mbulk count string");
6 setProtocolError("too big mbulk count string",c);
7 }
8 return C_ERR;
9 }
10
11 /* Buffer should also contain \n */
12 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
13 return C_ERR;
查找一行结束的标志,没有找到则报错并返回。找到新行则先判断querybuf是否能装下整个新行包括\r\n。
1 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
这一行获取了查询命令第一个字符后面紧跟的数值,由此我们可以得知redis的多行请求格式的第一行为
*[count]\r\n
获取后会对string2ll的返回值及查询长度进行校验
1 if (!ok || ll > 1024*1024) {
2 addReplyError(c,"Protocol error: invalid multibulk length");
3 setProtocolError("invalid mbulk count",c);
4 return C_ERR;
5 } else if (ll > 10 && authRequired(c)) {
6 addReplyError(c, "Protocol error: unauthenticated multibulk length");
7 setProtocolError("unauth mbulk count", c);
8 return C_ERR;
9 }
由代码可以看到,多行查询指令的参数个数不能大于1048576,否则会返回“Protocol error: invalid multibulk length”错误。 authRequired(c)调用则是检查ACL进行的验证结果。
完成这些以后,后面的几行主要完成指令环境初始化的工作
1 c->qb_pos = (newline-c->querybuf)+2;
这里调整读指针,准备读取下一行数据。
1 if (ll <= 0) return C_OK;
2
3 c->multibulklen = ll;
将参数数量保存到multibulklen中待用。
1 /* Setup argv array on client structure */
2 if (c->argv) zfree(c->argv);
3 c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
4 c->argv_len_sum = 0;
初始化argv用于保存参数
之后的代码用于解析协议中的参数
1 while(c->multibulklen) {
2 /* Read bulk length if unknown */
3 if (c->bulklen == -1) {
当解析刚开始的时候c->bulklen的值为-1,这也是在createClient中初始化的。bulklen表示当前解析参数的长度,这个分支就是用于填充bulklen的。
代码比较多我就不贴在这里了。知道逻辑以后对着代码看应该很容易理解,我只把最后的几行代码解释一下:
1 if (ll >= PROTO_MBULK_BIG_ARG) {
2 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
3 sdsrange(c->querybuf,c->qb_pos,-1);
4 c->qb_pos = 0;
5 /* Hint the sds library about the amount of bytes this string is
6 * going to contain. */
7 c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
8 }
9 }
10 c->bulklen = ll;
redis在这里做了一个优化:如果发现参数的长度过大,且querybuf中待处理的数据小于或等于这个对象的大小,则将这个对象的起始位置移动到querybuf的头部。在后面装配argv的时候可以直接使用这部分内存。
bulklen初始化完成后后面就是获取各个argv参数的解析过程
1 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
2 /* Not enough data (+2 == trailing \r\n) */
3 break;
4 }
这里必须要再进行一次判断,因为保存在querybuf的数据包由可能是之前解析了一半或者第二次循环之后的,此时之前的bulklen == -1的条件不成立。
1 else {
2 /* Optimization: if the buffer contains JUST our bulk element
3 * instead of creating a new object by *copying* the sds we
4 * just use the current sds string. */
5 if (c->qb_pos == 0 &&
6 c->bulklen >= PROTO_MBULK_BIG_ARG &&
7 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
8 {
9 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
10 c->argv_len_sum += c->bulklen;
11 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
12 /* Assume that if we saw a fat argument we'll see another one
13 * likely... */
14 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
15 sdsclear(c->querybuf);
16 } else {
17 c->argv[c->argc++] =
18 createStringObject(c->querybuf+c->qb_pos,c->bulklen);
19 c->argv_len_sum += c->bulklen;
20 c->qb_pos += c->bulklen+2;
21 }
22 c->bulklen = -1;
23 c->multibulklen--;
24 }
上面代码第一个分支内就是之前说的优化的地方,可以看到createObject将c->querybuf作为robj的初始化参数调用,内部会直接使用这个指针作为robj的值,而不会分配新的内存。之后为querybuf重新生成了sdsstring
否则,使用createStringObject创建robj对象并赋值给argv
每一个参数被解析完后会对multibulklen进行减一操作,直到所有参数被解析完后送入命令处理入口进行处理。
无论是processInlineBuffer还是processMultibulkBuffer,只要是返回C_ERR都会跳出processInputBuffer的处理。
至此redis网络数据的处理就讲完了,redis这部分的逻辑还是比较简单的,我这里做个总结。
- 在createClient中通过connSetReadHandler(conn, readQueryFromClient)设置数据处理函数。
- 数据到达时由redis的网络层调用readQueryFromClient
- 使用connRead(c->conn, c->querybuf+qblen, readlen)读取已到达的数据并存入到querybuf中
- 数据读完后调用processInputBUffer对已读数据进行处理。
- 对querybuf进行解析,根据第一个字符是否为'*' 确定指令的类型
- 单行:调用processInlineBuffer处理
- 多行:调用processMultibulkBuffer处理
redis 多行协议格式:
1*<参数数量> CR LF
2$<参数1 的字节数量> CR LF
3<参数1 的数据> CR LF
4...
5$<参数N 的字节数量> CR LF
6<参数N 的数据> CR LF
下一篇我们在来分析redis如何发送网络数据。
评论