处理连接请求
还记的我们在讲服务器启动的时候在initServer里面为socket设置Accept事件处理函数的地方吗?
1 /* Create an event handler for accepting new connections in TCP and Unix
2 * domain sockets. */
3 if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
4 serverPanic("Unrecoverable error creating TCP socket accept handler.");
5 }
6 if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
7 serverPanic("Unrecoverable error creating TLS socket accept handler.");
8 }
9 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
10 acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
11
当redis检测到有新的Socket连接上来时就会调用acceptTcpHandler处理连接创建的流程。SSL加密的连接则会送到acceptTLSHandler中处理。我们先来简单看一下createSocketAcceptHandler函数的调用,这个会在下个章节详细去讲,这里我们先简单了解一下。
createSocketAcceptHandler
1/* Create an event handler for accepting new connections in TCP or TLS domain sockets.
2 * This works atomically for all socket fds */
3int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
4 int j;
5
6 for (j = 0; j < sfd->count; j++) {
7 if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
8 /* Rollback */
9 for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
10 return C_ERR;
11 }
12 }
13 return C_OK;
14}
15
16int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
17 aeFileProc *proc, void *clientData)
18{
19 if (fd >= eventLoop->setsize) {
20 errno = ERANGE;
21 return AE_ERR;
22 }
23 aeFileEvent *fe = &eventLoop->events[fd];
24
25 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
26 return AE_ERR;
27 fe->mask |= mask;
28 if (mask & AE_READABLE) fe->rfileProc = proc;
29 if (mask & AE_WRITABLE) fe->wfileProc = proc;
30 fe->clientData = clientData;
31 if (fd > eventLoop->maxfd)
32 eventLoop->maxfd = fd;
33 return AE_OK;
34}
首先,我们可以看到createSocketAcceptHandler函数里其实是对server.ipfd这个结构中的fd字段进行了遍历。遍历的长度为count字段保存的值。而在之前对ListenToPort函数的分析我们知道,这里保存的是所有的监听Socket。
1aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)
这个调用里面server.el是redis的事件循环对象,具体的运作方式我们下一章再详细讲。这里主要看第三、第四个参数,分别是AE_READABLE和accept_handler。这两个参数的含义是指当sfd->fd[j]指代的Socket上有读事件的时候事件被激活,并交给accept_handler函数处理。
从aeCreateFileEvent函数中我们可以看到,redis将fd作为下标从eventLoop->events中取出了一个aeFileEvent 对象,保存设置的mask(逻辑或表示将传入的mask与fe->mask进行合并)并根据fe->mask指定位的值设置rfileProc和wfileProc两个函数指针。
aeApiAddEvent
那么aeApiAddEvent是做什么的?
1static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
2 aeApiState *state = eventLoop->apidata;
3 struct epoll_event ee = {0}; /* avoid valgrind warning */
4 /* If the fd was already monitored for some event, we need a MOD
5 * operation. Otherwise we need an ADD operation. */
6 int op = eventLoop->events[fd].mask == AE_NONE ?
7 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
8
9 ee.events = 0;
10 mask |= eventLoop->events[fd].mask; /* Merge old events */
11 if (mask & AE_READABLE) ee.events |= EPOLLIN;
12 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
13 ee.data.fd = fd;
14 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
15 return 0;
16}
这个函数最终的目的是调用epoll_ctl将fd添加到epoll队列中。这样当fd(Listen Socket)有事件产生(收到TCP SYN)时epoll被唤醒,并将fd和ee抛出来供后续逻辑处理。
也就是说新上来的连接最终会被acceptTcpHandler进行处理。
acceptTcpHandler
1void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
3 char cip[NET_IP_STR_LEN];
4 UNUSED(el);
5 UNUSED(mask);
6 UNUSED(privdata);
7
8 while(max--) {
9 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
10 if (cfd == ANET_ERR) {
11 if (errno != EWOULDBLOCK)
12 serverLog(LL_WARNING,
13 "Accepting client connection: %s", server.neterr);
14 return;
15 }
16 anetCloexec(cfd);
17 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
18 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
19 }
20}
当连接到达后,acceptTcpHandler会被调用,并将监听的socket 传入。因为epoll为边缘触发模式,所以这里会根据max值进行多次处理,以寻求更大的处理效率,直到系统的连接队列中没有更多的连接请求,此时anetTcpAccept会返回ANET_ERR。
连接建立后redis会调用anetCloexec,使redis在fork后能自动关闭这些无效的文件描述符。之后通过调用acceptCommonHandler创建客户端对象并将客户端与链接进行双向绑定。
anetTcpAccept
1static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
2 int fd;
3 while(1) {
4 fd = accept(s,sa,len);
5 if (fd == -1) {
6 if (errno == EINTR)
7 continue;
8 else {
9 anetSetError(err, "accept: %s", strerror(errno));
10 return ANET_ERR;
11 }
12 }
13 break;
14 }
15 return fd;
16}
17
18int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
19 int fd;
20 struct sockaddr_storage sa;
21 socklen_t salen = sizeof(sa);
22 if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
23 return ANET_ERR;
24
25 if (sa.ss_family == AF_INET) {
26 struct sockaddr_in *s = (struct sockaddr_in *)&sa;
27 if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
28 if (port) *port = ntohs(s->sin_port);
29 } else {
30 struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
31 if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
32 if (port) *port = ntohs(s->sin6_port);
33 }
34 return fd;
35}
anetTcpAccept很好理解,其使用accept函数接受了listen 端口上来的链接请求。并返回了连接的套接字,此外还将连接的ip和端口号进行了解析。
acceptCommonHandler
1#define MAX_ACCEPTS_PER_CALL 1000
2static void acceptCommonHandler(connection *conn, int flags, char *ip) {
3 client *c;
4 char conninfo[100];
5 UNUSED(ip);
6
7 if (connGetState(conn) != CONN_STATE_ACCEPTING) {
8 serverLog(LL_VERBOSE,
9 "Accepted client connection in error state: %s (conn: %s)",
10 connGetLastError(conn),
11 connGetInfo(conn, conninfo, sizeof(conninfo)));
12 connClose(conn);
13 return;
14 }
15
16 /* Limit the number of connections we take at the same time.
17 *
18 * Admission control will happen before a client is created and connAccept()
19 * called, because we don't want to even start transport-level negotiation
20 * if rejected. */
21 if (listLength(server.clients) + getClusterConnectionsCount()
22 >= server.maxclients)
23 {
24 char *err;
25 if (server.cluster_enabled)
26 err = "-ERR max number of clients + cluster "
27 "connections reached\r\n";
28 else
29 err = "-ERR max number of clients reached\r\n";
30
31 /* That's a best effort error message, don't check write errors.
32 * Note that for TLS connections, no handshake was done yet so nothing
33 * is written and the connection will just drop. */
34 if (connWrite(conn,err,strlen(err)) == -1) {
35 /* Nothing to do, Just to avoid the warning... */
36 }
37 server.stat_rejected_conn++;
38 connClose(conn);
39 return;
40 }
41
42 /* Create connection and client */
43 if ((c = createClient(conn)) == NULL) {
44 serverLog(LL_WARNING,
45 "Error registering fd event for the new client: %s (conn: %s)",
46 connGetLastError(conn),
47 connGetInfo(conn, conninfo, sizeof(conninfo)));
48 connClose(conn); /* May be already closed, just ignore errors */
49 return;
50 }
51
52 /* Last chance to keep flags */
53 c->flags |= flags;
54
55 /* Initiate accept.
56 *
57 * Note that connAccept() is free to do two things here:
58 * 1. Call clientAcceptHandler() immediately;
59 * 2. Schedule a future call to clientAcceptHandler().
60 *
61 * Because of that, we must do nothing else afterwards.
62 */
63 if (connAccept(conn, clientAcceptHandler) == C_ERR) {
64 char conninfo[100];
65 if (connGetState(conn) == CONN_STATE_ERROR)
66 serverLog(LL_WARNING,
67 "Error accepting a client connection: %s (conn: %s)",
68 connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
69 freeClient(connGetPrivateData(conn));
70 return;
71 }
72}
这个函数稍微有点长,但也不用害怕,我大致给出这个函数的流程。
graph TD conn{{"connGetState(conn) != CONN_STATE_ACCEPTING"}} -->|Y| Return conn -->|N| IfClientCount IfClientCount{{达到最大客户端连接数}} -->|Y| connWrite["发送连接失败消息"] connWrite --> IncRejected["增加拒绝次数"] IncRejected --> connClose1["关闭连接"] connClose1 --> Return IfClientCount -->|N| createClient createClient{{创建客户端对象 createClient}} -->|Fail| connClose2[connClose] connClose2 --> Return createClient -->|Success| connAccept connAccept{{"初始化连接"}} -->|Fail| freeClient["释放客户端和连接"] freeClient --> Return connAccept -->|Success| Return
这样看起来是不是清楚了很多呢?我们继续对其中调用的函数进行讲解。
connCreateAcceptedSocket
acceptTcpHandler在调用connCreateAcceptSocket创建connection对象之后,将这个新建的connection传入acceptCommon做最后的初始化工作。我们先来看看connection的定义
1struct connection {
2 /// 连接类型,这是个回调函数的集合,封装了这个connection同步读写、异步读写、等回调。可以认为这个结构描述了connection的行为。
3 ConnectionType *type;
4 /// 连接的状态包含 取值我注释在上面的枚举定义中。
5 ConnectionState state;
6 ///
7 short int flags;
8 /// 引用的操作数,这里值还在epoll等待事件未返回的操作数量。
9 short int refs;
10 /// 最后一次错误的错误码,调用connSocketGetLastError时返回
11 int last_errno;
12 /// 自定义的用户数据,这里用来保存client对象指针
13 void *private_data;
14 /// 连接事件处理函数,初始化的时候是置空的。
15 ConnectionCallbackFunc conn_handler;
16 /// 写事件处理函数,初始化的时候是置空的。
17 ConnectionCallbackFunc write_handler;
18 /// 读事件处理函数,初始化的时候是置空的。
19 ConnectionCallbackFunc read_handler;
20 /// Socket 描述符
21 int fd;
22};
classDiagram class connection { +type : ConnectionType * +state : ConnectionState +int flags : short +int refs : short +last_errno : int +private_data : void * +conn_handler : ConnectionCallbackFunc +write_handler : ConnectionCallbackFunc +read_handler : ConnectionCallbackFunc +fd : int } class ConnectionType { +ae_handler(struct aeEventLoop *el, int fd, void *clientData, int mask) void +connect(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler) int +write(struct connection *conn, const void *data, size_t data_len) int +read(struct connection *conn, void *buf, size_t buf_len) int +close(struct connection *conn) void +accept(struct connection *conn, ConnectionCallbackFunc accept_handler) int +set_write_handler(struct connection *conn, ConnectionCallbackFunc handler, int barrier) int +set_read_handler(struct connection *conn, ConnectionCallbackFunc handler) int +get_last_error(struct connection *conn) const char * +blocking_connect(struct connection *conn, const char *addr, int port, long long timeout) int +sync_write(struct connection *conn, char *ptr, ssize_t size, long long timeout) ssize_t +sync_read(struct connection *conn, char *ptr, ssize_t size, long long timeout) ssize_t +sync_readline(struct connection *conn, char *ptr, ssize_t size, long long timeout) ssize_t +get_type(struct connection *conn) int } class ConnectionState { CONN_STATE_NONE 0 CONN_STATE_CONNECTING CONN_STATE_ACCEPTING CONN_STATE_CONNECTED CONN_STATE_CLOSED CONN_STATE_ERROR } connection o-- ConnectionType: *type connection o-- ConnectionState: state
创建socket connection非常简单,redis先生成connection结构并将全部内存置为0,之后设置socket Type 为CT_Socket,这是个ConnectionType类型的全局变量,并将其中的所有变量初始化为了Socket 相关的函数。
1ConnectionType CT_Socket = {
2 .ae_handler = connSocketEventHandler,
3 .close = connSocketClose,
4 .write = connSocketWrite,
5 .read = connSocketRead,
6 .accept = connSocketAccept,
7 .connect = connSocketConnect,
8 .set_write_handler = connSocketSetWriteHandler,
9 .set_read_handler = connSocketSetReadHandler,
10 .get_last_error = connSocketGetLastError,
11 .blocking_connect = connSocketBlockingConnect,
12 .sync_write = connSocketSyncWrite,
13 .sync_read = connSocketSyncRead,
14 .sync_readline = connSocketSyncReadLine,
15 .get_type = connSocketGetType
16};
17
18connection *connCreateSocket() {
19 connection *conn = zcalloc(sizeof(connection));
20 conn->type = &CT_Socket;
21 conn->fd = -1;
22
23 return conn;
24}
我们先知道这些函数能做什么即可,后面会讲这些Socket 函数运行的细节。
函数 | 原型 | 作用 |
---|---|---|
connSocketEventHandler | void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) | Socket 读写事件的入口函数用于读写事件分派 |
connSocketClose | void connSocketClose(connection *conn) | 关闭连接会话 |
connSocketWrite | int connSocketWrite(connection *conn, const void *data, size_t data_len) | 写数据到连接上 |
connSocketRead | int connSocketRead(connection *conn, void *buf, size_t buf_len) | 从连接上读数据 |
connSocketAccept | int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) | 接受客户端连接请求 |
connSocketConnect | int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr, | |
ConnectionCallbackFunc connect_handler) | 连接到指定地址及端口号 | |
connSocketSetWriteHandler | int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) | 设置Socket上的写事件回调 |
connSocketSetReadHandler | int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) | 设置Socket上的读事件回调 |
connSocketGetLastError | const char *connSocketGetLastError(connection *conn) | 获取连接错误描述,错误被保存在connection::last_errno上 |
connSocketBlockingConnect | int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) | 以阻塞方式连接到指定地址及端口 |
connSocketSyncWrite | ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) | 同步写数据到连接上 |
connSocketSyncRead | ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) | 从连接上同步读数据 |
connSocketSyncReadLine | ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) | 从连接上同步读一行数据 |
connSocketGetTyp | int connSocketGetType(connection *conn) | 获取Socket类型现在总是会返回 CONN_TYPE_SOCKET |
connGetState
1 int connGetState(connection *conn) {
2 return conn->state;
3 }
这个函数只是简单的返回了conn::state中保存的值
createClient
1client *createClient(connection *conn) {
2 client *c = zmalloc(sizeof(client));
3
4 /* passing NULL as conn it is possible to create a non connected client.
5 * This is useful since all the commands needs to be executed
6 * in the context of a client. When commands are executed in other
7 * contexts (for instance a Lua script) we need a non connected client. */
8 if (conn) {
9 connNonBlock(conn);
10 connEnableTcpNoDelay(conn);
11 if (server.tcpkeepalive)
12 connKeepAlive(conn,server.tcpkeepalive);
13 connSetReadHandler(conn, readQueryFromClient);
14 connSetPrivateData(conn, c);
15 }
16
17 selectDb(c,0);
18 uint64_t client_id;
19 atomicGetIncr(server.next_client_id, client_id, 1);
20 c->id = client_id;
21 c->resp = 2;
22 c->conn = conn;
23 c->name = NULL;
24 c->bufpos = 0;
25 c->qb_pos = 0;
26 c->querybuf = sdsempty();
27 c->pending_querybuf = sdsempty();
28 c->querybuf_peak = 0;
29 c->reqtype = 0;
30 c->argc = 0;
31 c->argv = NULL;
32 c->argv_len_sum = 0;
33 c->original_argc = 0;
34 c->original_argv = NULL;
35 c->cmd = c->lastcmd = NULL;
36 c->multibulklen = 0;
37 c->bulklen = -1;
38 c->sentlen = 0;
39 c->flags = 0;
40 c->ctime = c->lastinteraction = server.unixtime;
41 clientSetDefaultAuth(c);
42 c->replstate = REPL_STATE_NONE;
43 c->repl_put_online_on_ack = 0;
44 c->reploff = 0;
45 c->read_reploff = 0;
46 c->repl_ack_off = 0;
47 c->repl_ack_time = 0;
48 c->repl_last_partial_write = 0;
49 c->slave_listening_port = 0;
50 c->slave_addr = NULL;
51 c->slave_capa = SLAVE_CAPA_NONE;
52 c->reply = listCreate();
53 c->reply_bytes = 0;
54 c->obuf_soft_limit_reached_time = 0;
55 listSetFreeMethod(c->reply,freeClientReplyValue);
56 listSetDupMethod(c->reply,dupClientReplyValue);
57 c->btype = BLOCKED_NONE;
58 c->bpop.timeout = 0;
59 c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
60 c->bpop.target = NULL;
61 c->bpop.xread_group = NULL;
62 c->bpop.xread_consumer = NULL;
63 c->bpop.xread_group_noack = 0;
64 c->bpop.numreplicas = 0;
65 c->bpop.reploffset = 0;
66 c->woff = 0;
67 c->watched_keys = listCreate();
68 c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
69 c->pubsub_patterns = listCreate();
70 c->peerid = NULL;
71 c->sockname = NULL;
72 c->client_list_node = NULL;
73 c->paused_list_node = NULL;
74 c->client_tracking_redirection = 0;
75 c->client_tracking_prefixes = NULL;
76 c->client_cron_last_memory_usage = 0;
77 c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;
78 c->auth_callback = NULL;
79 c->auth_callback_privdata = NULL;
80 c->auth_module = NULL;
81 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
82 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
83 if (conn) linkClient(c);
84 initClientMultiState(c);
85 return c;
86}
这个函数初始进入后先对conn做了判断,当Conn有效时做了以下几件事情:
- 将connection设置为非阻塞。(这里和accept时的设置重复了,不知道有什么意义)
- 将connection设置为不使用nagle算法降低数据延迟。nagle算法会在系统缓冲满或与上一次发送间隔超过40ms时才真正发送数据。redis显然不希望有这个延迟存在。
- 检查server.tcpkeepalive选项设置,大于 0 则设置Socket使用KeepAlive。
- 设置TCP_KEEPIDLE为tcpkeepalive的值
- 设置TCP_KEEPINTVL为tcpkeepalive/3
- 设置TCP_KEEPCNT为3
keepalive是socket的保活机制,linux默认7200秒没有数据则发送保活报文,若没有回应则间隔TCP_KEEPINTVL秒后再发一次。若TCP_KEEPCNT次都没有收到响应则认为连接中断。 这个设置是否开启被 linux 宏控制。
- 设置读数据处理函数为 readQueryFromClient 实际是设置了conn::read_handler的值
- 将生成的client对象指针设置给conn::privateData
从代码上看createClient允许生成一个没有connection的client对象,比如module模块中用于执行redis命令的client对象。
之后通过
1selectDB(c,0)
设置默认的redisDB,实际的设置代码如下
1int selectDb(client *c, int id) {
2 if (id < 0 || id >= server.dbnum)
3 return C_ERR;
4 c->db = &server.db[id];
5 return C_OK;
6}
这个实际上是让client::db指向了server.db[id],server的DB数量是由databases选项设置的,默认是16个。
1atomicGetIncr(server.next_client_id, client_id, 1);
这里的代码为新生成的client分配了client_id一个64位整型值,这个id会在linkClient函数中被用到。 剩下的都是client结构的的初始化代码,直到最后两行。
1if (conn) linkClient(c);
2initClientMultiState(c);
我们先来看linkClient(c);
1void linkClient(client *c) {
2 listAddNodeTail(server.clients,c);
3 /* Note that we remember the linked list node where the client is stored,
4 * this way removing the client in unlinkClient() will not require
5 * a linear scan, but just a constant time operation. */
6 c->client_list_node = listLast(server.clients);
7 uint64_t id = htonu64(c->id);
8 raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
9}
函数里面做了两件事情:
- 将client对象添加到server.clients链表的尾部,并将client::client_list_node指针指向server.clients自己所属的节点。这是为了在client释放时能快速从server.clients中摘除自己。
- 调用raxIndex函数用client_id作为key插入到server.clients_index中。
raxIndex是压缩前缀树的数据结构,其插入、查询、删除的时间复杂度都为O(n) n为key的长度。这个数据结构在redis中的很多地方都有使用,比如acl,client timeout控制等。具体的算法后面有专门的章节会讲,这里就先不详细描述了。现在只需要知道这是一个类似dict的功能即可。
再来看initClientMultiState
1void initClientMultiState(client *c) {
2 c->mstate.commands = NULL;
3 c->mstate.count = 0;
4 c->mstate.cmd_flags = 0;
5 c->mstate.cmd_inv_flags = 0;
6}
这个函数只是对client的状态进行了初始化。其中:
- commands : multiCmd 指针,指向multiCmd数组
- count : commands 数组长度
- cmd_flgas : 用于记录commands中所有命令的标记集合。
- cmd_inv_flags : 用于记录commands中所有命令的标记集合。
commands 作为数组是为了记录redis事务中的所有命令的。当事务开始后,所有要执行的redis命令都会被缓存提交到commands中等待执行,直到调用exec指令提交事务。
在这之后的connAccept(conn, clientAcceptHandler)调用则只是简单的对connection的状态设置为CONN_STATE_CONNECTED,表示这个连接已经建立完成。之后调用了clientAcceptHandler 做了:
- 增加连接计数(仅作为统计)
- 发送模块事件
- 事件类型 REDISMODULE_EVENT_CLIENT_CHANGE
- 事件标识 REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,
至此连接建立的整个流程就完成了。
评论