接受网络连接

处理连接请求

还记的我们在讲服务器启动的时候在initServer里面为socket设置Accept事件处理函数的地方吗?

 1    /* Create an event handler for accepting new connections in TCP and Unix
 2     * domain sockets. */
 3    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
 4        serverPanic("Unrecoverable error creating TCP socket accept handler.");
 5    }
 6    if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
 7        serverPanic("Unrecoverable error creating TLS socket accept handler.");
 8    }
 9    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
10        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
11

当redis检测到有新的Socket连接上来时就会调用acceptTcpHandler处理连接创建的流程。SSL加密的连接则会送到acceptTLSHandler中处理。我们先来简单看一下createSocketAcceptHandler函数的调用,这个会在下个章节详细去讲,这里我们先简单了解一下。

createSocketAcceptHandler

 1/* Create an event handler for accepting new connections in TCP or TLS domain sockets.
 2 * This works atomically for all socket fds */
 3int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
 4    int j;
 5
 6    for (j = 0; j < sfd->count; j++) {
 7        if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
 8            /* Rollback */
 9            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
10            return C_ERR;
11        }
12    }
13    return C_OK;
14}
15
16int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
17        aeFileProc *proc, void *clientData)
18{
19    if (fd >= eventLoop->setsize) {
20        errno = ERANGE;
21        return AE_ERR;
22    }
23    aeFileEvent *fe = &eventLoop->events[fd];
24
25    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
26        return AE_ERR;
27    fe->mask |= mask;
28    if (mask & AE_READABLE) fe->rfileProc = proc;
29    if (mask & AE_WRITABLE) fe->wfileProc = proc;
30    fe->clientData = clientData;
31    if (fd > eventLoop->maxfd)
32        eventLoop->maxfd = fd;
33    return AE_OK;
34}

首先,我们可以看到createSocketAcceptHandler函数里其实是对server.ipfd这个结构中的fd字段进行了遍历。遍历的长度为count字段保存的值。而在之前对ListenToPort函数的分析我们知道,这里保存的是所有的监听Socket。

1aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)

这个调用里面server.el是redis的事件循环对象,具体的运作方式我们下一章再详细讲。这里主要看第三、第四个参数,分别是AE_READABLE和accept_handler。这两个参数的含义是指当sfd->fd[j]指代的Socket上有读事件的时候事件被激活,并交给accept_handler函数处理。

从aeCreateFileEvent函数中我们可以看到,redis将fd作为下标从eventLoop->events中取出了一个aeFileEvent 对象,保存设置的mask(逻辑或表示将传入的mask与fe->mask进行合并)并根据fe->mask指定位的值设置rfileProc和wfileProc两个函数指针。

aeApiAddEvent

那么aeApiAddEvent是做什么的?

 1static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
 2    aeApiState *state = eventLoop->apidata;
 3    struct epoll_event ee = {0}; /* avoid valgrind warning */
 4    /* If the fd was already monitored for some event, we need a MOD
 5     * operation. Otherwise we need an ADD operation. */
 6    int op = eventLoop->events[fd].mask == AE_NONE ?
 7            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
 8
 9    ee.events = 0;
10    mask |= eventLoop->events[fd].mask; /* Merge old events */
11    if (mask & AE_READABLE) ee.events |= EPOLLIN;
12    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
13    ee.data.fd = fd;
14    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
15    return 0;
16}

这个函数最终的目的是调用epoll_ctl将fd添加到epoll队列中。这样当fd(Listen Socket)有事件产生(收到TCP SYN)时epoll被唤醒,并将fd和ee抛出来供后续逻辑处理。

也就是说新上来的连接最终会被acceptTcpHandler进行处理。

acceptTcpHandler

 1void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 2    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
 3    char cip[NET_IP_STR_LEN];
 4    UNUSED(el);
 5    UNUSED(mask);
 6    UNUSED(privdata);
 7
 8    while(max--) {
 9        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
10        if (cfd == ANET_ERR) {
11            if (errno != EWOULDBLOCK)
12                serverLog(LL_WARNING,
13                    "Accepting client connection: %s", server.neterr);
14            return;
15        }
16        anetCloexec(cfd);
17        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
18        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
19    }
20}

当连接到达后,acceptTcpHandler会被调用,并将监听的socket 传入。因为epoll为边缘触发模式,所以这里会根据max值进行多次处理,以寻求更大的处理效率,直到系统的连接队列中没有更多的连接请求,此时anetTcpAccept会返回ANET_ERR。

连接建立后redis会调用anetCloexec,使redis在fork后能自动关闭这些无效的文件描述符。之后通过调用acceptCommonHandler创建客户端对象并将客户端与链接进行双向绑定。

anetTcpAccept

 1static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
 2    int fd;
 3    while(1) {
 4        fd = accept(s,sa,len);
 5        if (fd == -1) {
 6            if (errno == EINTR)
 7                continue;
 8            else {
 9                anetSetError(err, "accept: %s", strerror(errno));
10                return ANET_ERR;
11            }
12        }
13        break;
14    }
15    return fd;
16}
17
18int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
19    int fd;
20    struct sockaddr_storage sa;
21    socklen_t salen = sizeof(sa);
22    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
23        return ANET_ERR;
24
25    if (sa.ss_family == AF_INET) {
26        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
27        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
28        if (port) *port = ntohs(s->sin_port);
29    } else {
30        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
31        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
32        if (port) *port = ntohs(s->sin6_port);
33    }
34    return fd;
35}

anetTcpAccept很好理解,其使用accept函数接受了listen 端口上来的链接请求。并返回了连接的套接字,此外还将连接的ip和端口号进行了解析。

acceptCommonHandler

 1#define MAX_ACCEPTS_PER_CALL 1000
 2static void acceptCommonHandler(connection *conn, int flags, char *ip) {
 3    client *c;
 4    char conninfo[100];
 5    UNUSED(ip);
 6
 7    if (connGetState(conn) != CONN_STATE_ACCEPTING) {
 8        serverLog(LL_VERBOSE,
 9            "Accepted client connection in error state: %s (conn: %s)",
10            connGetLastError(conn),
11            connGetInfo(conn, conninfo, sizeof(conninfo)));
12        connClose(conn);
13        return;
14    }
15
16    /* Limit the number of connections we take at the same time.
17     *
18     * Admission control will happen before a client is created and connAccept()
19     * called, because we don't want to even start transport-level negotiation
20     * if rejected. */
21    if (listLength(server.clients) + getClusterConnectionsCount()
22        >= server.maxclients)
23    {
24        char *err;
25        if (server.cluster_enabled)
26            err = "-ERR max number of clients + cluster "
27                  "connections reached\r\n";
28        else
29            err = "-ERR max number of clients reached\r\n";
30
31        /* That's a best effort error message, don't check write errors.
32         * Note that for TLS connections, no handshake was done yet so nothing
33         * is written and the connection will just drop. */
34        if (connWrite(conn,err,strlen(err)) == -1) {
35            /* Nothing to do, Just to avoid the warning... */
36        }
37        server.stat_rejected_conn++;
38        connClose(conn);
39        return;
40    }
41
42    /* Create connection and client */
43    if ((c = createClient(conn)) == NULL) {
44        serverLog(LL_WARNING,
45            "Error registering fd event for the new client: %s (conn: %s)",
46            connGetLastError(conn),
47            connGetInfo(conn, conninfo, sizeof(conninfo)));
48        connClose(conn); /* May be already closed, just ignore errors */
49        return;
50    }
51
52    /* Last chance to keep flags */
53    c->flags |= flags;
54
55    /* Initiate accept.
56     *
57     * Note that connAccept() is free to do two things here:
58     * 1. Call clientAcceptHandler() immediately;
59     * 2. Schedule a future call to clientAcceptHandler().
60     *
61     * Because of that, we must do nothing else afterwards.
62     */
63    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
64        char conninfo[100];
65        if (connGetState(conn) == CONN_STATE_ERROR)
66            serverLog(LL_WARNING,
67                    "Error accepting a client connection: %s (conn: %s)",
68                    connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
69        freeClient(connGetPrivateData(conn));
70        return;
71    }
72}

这个函数稍微有点长,但也不用害怕,我大致给出这个函数的流程。

graph TD
    conn{{"connGetState(conn) != CONN_STATE_ACCEPTING"}} -->|Y| Return
    conn -->|N| IfClientCount
    IfClientCount{{达到最大客户端连接数}} -->|Y| connWrite["发送连接失败消息"]
    connWrite --> IncRejected["增加拒绝次数"]
    IncRejected --> connClose1["关闭连接"]
    connClose1 --> Return
    IfClientCount -->|N| createClient
    createClient{{创建客户端对象 createClient}} -->|Fail| connClose2[connClose]
    connClose2 --> Return
    createClient -->|Success| connAccept
    connAccept{{"初始化连接"}} -->|Fail| freeClient["释放客户端和连接"]
    freeClient --> Return
    connAccept -->|Success| Return

这样看起来是不是清楚了很多呢?我们继续对其中调用的函数进行讲解。

connCreateAcceptedSocket

acceptTcpHandler在调用connCreateAcceptSocket创建connection对象之后,将这个新建的connection传入acceptCommon做最后的初始化工作。我们先来看看connection的定义

 1struct connection {
 2    /// 连接类型,这是个回调函数的集合,封装了这个connection同步读写、异步读写、等回调。可以认为这个结构描述了connection的行为。
 3    ConnectionType *type;
 4    /// 连接的状态包含 取值我注释在上面的枚举定义中。
 5    ConnectionState state;
 6    /// 
 7    short int flags;
 8    /// 引用的操作数,这里值还在epoll等待事件未返回的操作数量。
 9    short int refs;
10    /// 最后一次错误的错误码,调用connSocketGetLastError时返回
11    int last_errno;
12    /// 自定义的用户数据,这里用来保存client对象指针
13    void *private_data;
14    /// 连接事件处理函数,初始化的时候是置空的。
15    ConnectionCallbackFunc conn_handler;
16    /// 写事件处理函数,初始化的时候是置空的。
17    ConnectionCallbackFunc write_handler;
18    /// 读事件处理函数,初始化的时候是置空的。
19    ConnectionCallbackFunc read_handler;
20    /// Socket 描述符
21    int fd;
22};
classDiagram
    class connection {
        +type : ConnectionType *
        +state : ConnectionState
        +int flags : short
        +int refs : short
        +last_errno : int
        +private_data : void *
        +conn_handler : ConnectionCallbackFunc
        +write_handler : ConnectionCallbackFunc
        +read_handler : ConnectionCallbackFunc
        +fd : int
    }

    class ConnectionType {
        +ae_handler(struct aeEventLoop *el, int fd, void *clientData, int mask) void 
        +connect(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler) int 
        +write(struct connection *conn, const void *data, size_t data_len) int 
        +read(struct connection *conn, void *buf, size_t buf_len) int 
        +close(struct connection *conn) void 
        +accept(struct connection *conn, ConnectionCallbackFunc accept_handler) int 
        +set_write_handler(struct connection *conn, ConnectionCallbackFunc handler, int barrier) int 
        +set_read_handler(struct connection *conn, ConnectionCallbackFunc handler) int 
        +get_last_error(struct connection *conn) const char *
        +blocking_connect(struct connection *conn, const char *addr, int port, long long timeout) int 
        +sync_write(struct connection *conn, char *ptr, ssize_t size, long long timeout) ssize_t 
        +sync_read(struct connection *conn, char *ptr, ssize_t size, long long timeout) ssize_t 
        +sync_readline(struct connection *conn, char *ptr, ssize_t size, long long timeout) ssize_t 
        +get_type(struct connection *conn) int 
    }

    class ConnectionState {
        CONN_STATE_NONE 0
        CONN_STATE_CONNECTING
        CONN_STATE_ACCEPTING
        CONN_STATE_CONNECTED
        CONN_STATE_CLOSED
        CONN_STATE_ERROR
    }
    connection o-- ConnectionType: *type
    connection o-- ConnectionState: state

创建socket connection非常简单,redis先生成connection结构并将全部内存置为0,之后设置socket Type 为CT_Socket,这是个ConnectionType类型的全局变量,并将其中的所有变量初始化为了Socket 相关的函数。

 1ConnectionType CT_Socket = {
 2    .ae_handler = connSocketEventHandler,
 3    .close = connSocketClose,
 4    .write = connSocketWrite,
 5    .read = connSocketRead,
 6    .accept = connSocketAccept,
 7    .connect = connSocketConnect,
 8    .set_write_handler = connSocketSetWriteHandler,
 9    .set_read_handler = connSocketSetReadHandler,
10    .get_last_error = connSocketGetLastError,
11    .blocking_connect = connSocketBlockingConnect,
12    .sync_write = connSocketSyncWrite,
13    .sync_read = connSocketSyncRead,
14    .sync_readline = connSocketSyncReadLine,
15    .get_type = connSocketGetType
16};
17
18connection *connCreateSocket() {
19    connection *conn = zcalloc(sizeof(connection));
20    conn->type = &CT_Socket;
21    conn->fd = -1;
22
23    return conn;
24}

我们先知道这些函数能做什么即可,后面会讲这些Socket 函数运行的细节。

函数 原型 作用
connSocketEventHandler void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) Socket 读写事件的入口函数用于读写事件分派
connSocketClose void connSocketClose(connection *conn) 关闭连接会话
connSocketWrite int connSocketWrite(connection *conn, const void *data, size_t data_len) 写数据到连接上
connSocketRead int connSocketRead(connection *conn, void *buf, size_t buf_len) 从连接上读数据
connSocketAccept int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) 接受客户端连接请求
connSocketConnect int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr,
ConnectionCallbackFunc connect_handler) 连接到指定地址及端口号
connSocketSetWriteHandler int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) 设置Socket上的写事件回调
connSocketSetReadHandler int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) 设置Socket上的读事件回调
connSocketGetLastError const char *connSocketGetLastError(connection *conn) 获取连接错误描述,错误被保存在connection::last_errno上
connSocketBlockingConnect int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) 以阻塞方式连接到指定地址及端口
connSocketSyncWrite ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) 同步写数据到连接上
connSocketSyncRead ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) 从连接上同步读数据
connSocketSyncReadLine ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) 从连接上同步读一行数据
connSocketGetTyp int connSocketGetType(connection *conn) 获取Socket类型现在总是会返回 CONN_TYPE_SOCKET

connGetState

1    int connGetState(connection *conn) {
2        return conn->state;
3    }

这个函数只是简单的返回了conn::state中保存的值

createClient

 1client *createClient(connection *conn) {
 2    client *c = zmalloc(sizeof(client));
 3
 4    /* passing NULL as conn it is possible to create a non connected client.
 5     * This is useful since all the commands needs to be executed
 6     * in the context of a client. When commands are executed in other
 7     * contexts (for instance a Lua script) we need a non connected client. */
 8    if (conn) {
 9        connNonBlock(conn);
10        connEnableTcpNoDelay(conn);
11        if (server.tcpkeepalive)
12            connKeepAlive(conn,server.tcpkeepalive);
13        connSetReadHandler(conn, readQueryFromClient);
14        connSetPrivateData(conn, c);
15    }
16
17    selectDb(c,0);
18    uint64_t client_id;
19    atomicGetIncr(server.next_client_id, client_id, 1);
20    c->id = client_id;
21    c->resp = 2;
22    c->conn = conn;
23    c->name = NULL;
24    c->bufpos = 0;
25    c->qb_pos = 0;
26    c->querybuf = sdsempty();
27    c->pending_querybuf = sdsempty();
28    c->querybuf_peak = 0;
29    c->reqtype = 0;
30    c->argc = 0;
31    c->argv = NULL;
32    c->argv_len_sum = 0;
33    c->original_argc = 0;
34    c->original_argv = NULL;
35    c->cmd = c->lastcmd = NULL;
36    c->multibulklen = 0;
37    c->bulklen = -1;
38    c->sentlen = 0;
39    c->flags = 0;
40    c->ctime = c->lastinteraction = server.unixtime;
41    clientSetDefaultAuth(c);
42    c->replstate = REPL_STATE_NONE;
43    c->repl_put_online_on_ack = 0;
44    c->reploff = 0;
45    c->read_reploff = 0;
46    c->repl_ack_off = 0;
47    c->repl_ack_time = 0;
48    c->repl_last_partial_write = 0;
49    c->slave_listening_port = 0;
50    c->slave_addr = NULL;
51    c->slave_capa = SLAVE_CAPA_NONE;
52    c->reply = listCreate();
53    c->reply_bytes = 0;
54    c->obuf_soft_limit_reached_time = 0;
55    listSetFreeMethod(c->reply,freeClientReplyValue);
56    listSetDupMethod(c->reply,dupClientReplyValue);
57    c->btype = BLOCKED_NONE;
58    c->bpop.timeout = 0;
59    c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
60    c->bpop.target = NULL;
61    c->bpop.xread_group = NULL;
62    c->bpop.xread_consumer = NULL;
63    c->bpop.xread_group_noack = 0;
64    c->bpop.numreplicas = 0;
65    c->bpop.reploffset = 0;
66    c->woff = 0;
67    c->watched_keys = listCreate();
68    c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
69    c->pubsub_patterns = listCreate();
70    c->peerid = NULL;
71    c->sockname = NULL;
72    c->client_list_node = NULL;
73    c->paused_list_node = NULL;
74    c->client_tracking_redirection = 0;
75    c->client_tracking_prefixes = NULL;
76    c->client_cron_last_memory_usage = 0;
77    c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;
78    c->auth_callback = NULL;
79    c->auth_callback_privdata = NULL;
80    c->auth_module = NULL;
81    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
82    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
83    if (conn) linkClient(c);
84    initClientMultiState(c);
85    return c;
86}

这个函数初始进入后先对conn做了判断,当Conn有效时做了以下几件事情:

  1. 将connection设置为非阻塞。(这里和accept时的设置重复了,不知道有什么意义)
  2. 将connection设置为不使用nagle算法降低数据延迟。nagle算法会在系统缓冲满或与上一次发送间隔超过40ms时才真正发送数据。redis显然不希望有这个延迟存在。
  3. 检查server.tcpkeepalive选项设置,大于 0 则设置Socket使用KeepAlive。
    • 设置TCP_KEEPIDLE为tcpkeepalive的值
    • 设置TCP_KEEPINTVL为tcpkeepalive/3
    • 设置TCP_KEEPCNT为3

    keepalive是socket的保活机制,linux默认7200秒没有数据则发送保活报文,若没有回应则间隔TCP_KEEPINTVL秒后再发一次。若TCP_KEEPCNT次都没有收到响应则认为连接中断。 这个设置是否开启被 linux 宏控制。

  4. 设置读数据处理函数为 readQueryFromClient 实际是设置了conn::read_handler的值
  5. 将生成的client对象指针设置给conn::privateData

从代码上看createClient允许生成一个没有connection的client对象,比如module模块中用于执行redis命令的client对象。

之后通过

1selectDB(c,0)

设置默认的redisDB,实际的设置代码如下

1int selectDb(client *c, int id) {
2    if (id < 0 || id >= server.dbnum)
3        return C_ERR;
4    c->db = &server.db[id];
5    return C_OK;
6}

这个实际上是让client::db指向了server.db[id],server的DB数量是由databases选项设置的,默认是16个。

1atomicGetIncr(server.next_client_id, client_id, 1);

这里的代码为新生成的client分配了client_id一个64位整型值,这个id会在linkClient函数中被用到。 剩下的都是client结构的的初始化代码,直到最后两行。

1if (conn) linkClient(c);
2initClientMultiState(c);

我们先来看linkClient(c);

1void linkClient(client *c) {
2    listAddNodeTail(server.clients,c);
3    /* Note that we remember the linked list node where the client is stored,
4     * this way removing the client in unlinkClient() will not require
5     * a linear scan, but just a constant time operation. */
6    c->client_list_node = listLast(server.clients);
7    uint64_t id = htonu64(c->id);
8    raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
9}

函数里面做了两件事情:

  1. 将client对象添加到server.clients链表的尾部,并将client::client_list_node指针指向server.clients自己所属的节点。这是为了在client释放时能快速从server.clients中摘除自己。
  2. 调用raxIndex函数用client_id作为key插入到server.clients_index中。

raxIndex是压缩前缀树的数据结构,其插入、查询、删除的时间复杂度都为O(n) n为key的长度。这个数据结构在redis中的很多地方都有使用,比如acl,client timeout控制等。具体的算法后面有专门的章节会讲,这里就先不详细描述了。现在只需要知道这是一个类似dict的功能即可。

再来看initClientMultiState

1void initClientMultiState(client *c) {
2    c->mstate.commands = NULL;
3    c->mstate.count = 0;
4    c->mstate.cmd_flags = 0;
5    c->mstate.cmd_inv_flags = 0;
6}

这个函数只是对client的状态进行了初始化。其中:

  • commands : multiCmd 指针,指向multiCmd数组
  • count : commands 数组长度
  • cmd_flgas : 用于记录commands中所有命令的标记集合。
  • cmd_inv_flags : 用于记录commands中所有命令的标记集合。

commands 作为数组是为了记录redis事务中的所有命令的。当事务开始后,所有要执行的redis命令都会被缓存提交到commands中等待执行,直到调用exec指令提交事务。

在这之后的connAccept(conn, clientAcceptHandler)调用则只是简单的对connection的状态设置为CONN_STATE_CONNECTED,表示这个连接已经建立完成。之后调用了clientAcceptHandler 做了:

  • 增加连接计数(仅作为统计)
  • 发送模块事件
    • 事件类型 REDISMODULE_EVENT_CLIENT_CHANGE
    • 事件标识 REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,

至此连接建立的整个流程就完成了。

评论