Redis AE事件驱动

AE事件驱动是Redis命令解析处理的基础。

事件驱动

Redis实现了一套事件驱动器AE,理由也很简单,逻辑简单可控。

Memcached的事件驱动时基于libevent的。

事件结构

aeEventLoop是整个事件驱动的核心,事件的注册与触发都基于此。

1
2
3
4
5
6
7
8
9
10
11
12
/* 事件循环结构体 */
typedef struct aeEventLoop {
...
int setsize; // 支持连接的最大句柄数量
aeFileEvent *events; // 已注册的事件
aeFiredEvent *fired; // 已触发的事件
aeTimeEvent *timeEventHead; // 时间事件(定时触发)
aeBeforeSleepProc *beforesleep; // 事件循环sleep之前的的执行函数
aeBeforeSleepProc *aftersleep; // 事件循环sleep之后的的执行函数
void *apidata; // select/poll/epoll的API交互数据
...
} aeEventLoop;

aeFileEvent是具体的事件结构,其中,包含了事件类型对应的处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
#define AE_NONE 0       // 未注册
#define AE_READABLE 1 // 可读
#define AE_WRITABLE 2 // 可写
#define AE_BARRIER 4 // 强制先写后读

/* 文件(IO)事件结构体 */
typedef struct aeFileEvent {
/* 读写标记 */
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc; // 读事件对应的执行函数
aeFileProc *wfileProc; // 写事件对应的执行函数
void *clientData; // 事件相关的数据
} aeFileEvent;
初始化

aeCreateEventLoop是用于初始化事件循环结构体。

setsize是Redis支持句柄的数量,在eventloop初始化时用于初始化事件的存储大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* 初始化事件循环 */
aeEventLoop *aeCreateEventLoop(int setsize) {
...
// 初始化事件循环
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 设置默认支持最大连接的数量(也就是可存储的事件数量)
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
// 已触发的事件的最大数量与注册事件的最大数量相同
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
...
// 创建事件循环触发器(select/epoll/...)
// 具体参考:IO多路复用的多种实现
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
return eventLoop;
}

Redis支持多个地址端口的绑定,文件句柄都存储在server.ipfd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int ipfd[CONFIG_BINDADDR_MAX]; // 用于监听客户端请求的文件句柄

/* 服务器初始化 */
void initServer(void) {
// socket.listen绑定监听端口
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
...
// 每一个fd创建一个可读事件,并设置socket.accept回调
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
...
}
}
...
}

aeCreateFileEvent是用于为监听的句柄创建事件及其对应的处理器,其中,aeApiAddEvent存在多种IO多路复用的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* 为句柄创建事件 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 获取句柄对应的事件
aeFileEvent *fe = &eventLoop->events[fd];
// 将句柄加入到事件监听中(select/epoll/...)
// 具体参考:IO多路复用的多种实现
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 设置事件回调(区分读写)
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 设置客户端信息
fe->clientData = clientData;
return AE_OK;
}

acceptTcpHandlerSocket.Accept接收到新的客户端请求的处理器,用于设置后续的可读事件,

1
2
3
4
5
6
7
8
9
10
11
/* 处理监听到的Accept请求 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
...
while(max--) {
// Accept句柄fd监听到的请求,返回客户端句柄cfd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...
/* Accept成功后设置可读事件 */
acceptCommonHandler(cfd,0,cip);
}
}

acceptCommonHandler是用于Accept后用于初始化客户端并设置可读事件的处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
/* 创建客户端(内部初始化可读事件) */
if ((c = createClient(fd)) == NULL) {
...
}
}

/* 初始化客户端并设置可读事件 */
client *createClient(int fd) {
// 初始化客户端
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd); // 是否阻塞
anetEnableTcpNoDelay(NULL,fd); // 是否关闭Nagle算法
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive); // 设置keepalive
// 为当前客户端创建可读事件,并设置读取处理函数readQueryFromClient(下面有详细介绍)
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
...
}
...
}

到此,Redis服务端的Socket绑定与监听客户端初始化等初始化逻辑已经分析完毕。

主流程

Redis的处理过程是单线程的,事件驱动的核心就在aeMain这个循环体内。

基于内核提供的select/poll/epoll来轮询事件实现循环执行。

1
2
3
4
5
6
7
8
9
10
11
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 事件循环
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
// 触发事件循环的前置处理器
eventLoop->beforesleep(eventLoop);
// 事件处理
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
事件处理

aeProcessEvents是具体的处理逻辑,包括了事件获取事件分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/* 事件处理函数 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 处理数量
int processed = 0;

// IO多路复用API, 获取并设置等待处理的事件数量(eventloop.fired)
int numevents = aeApiPoll(eventLoop, tvp);

// 触发事件循环的后置处理器
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);

/* 开始分发事件,执行对应的处理器 */

// 遍历已触发待处理的事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask; // 事件类型
int fd = eventLoop->fired[j].fd; // 事件句柄
int fired = 0; // 统计事件

// 是否开启强制先写后读
int invert = fe->mask & AE_BARRIER;

// 如果未开启先写后读且为读事件
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
// 如果为写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 如果开启先写后读且为读事件
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 处理数量+1
processed++;
}

// 处理定时事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

从上面的源码分析得出:事件的收集工作是系统负责的,Redis仅通过每次的循环来不断拿到最新的触发事件

AE事件驱动的原理:IO多路复用

在上面的处理过程中,使用了aeApiPoll这个函数,按不同平台的实现方式有:

  • ae_evport:Solaris 10
  • ae_kqueue:OS X / FreeBSD
  • ae_select:通用的
  • ae_epoll:Linux

What are the underlying differences among select, epoll, kqueue, and evport?

Select最多支持1024个文件句柄,由于每次都需要遍历所有的操作符的状态,因此,时间复杂度是O(n);

EvportEpollKQueue支持更多的文件句柄,基于系统的实现策略不需要遍历操作符,时间复杂度是O(1)。

这里仅分析基于epoll的实现。

事件注册
1
2
3
4
5
6
7
8
9
10
11
12
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
...
// 识别读写事件,设置EPOLLIN/EPOLLOUT
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 调用epoll的epoll_ctl注册需要监听的事件
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
事件获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// AE事件获取
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

// 调用epoll的epoll_wait获取等待事件(类似select调用)
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
// 循环遍历所有已触发的事件
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
// 识别事件类型
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
// 设置链接的文件句柄
eventLoop->fired[j].fd = e->data.fd;
// 设置事件类型
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
事件处理

读取

readQueryFromClient是从客户端读取请求数据的处理函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 每次读取Buffer的默认大小:16K
#define PROTO_IOBUF_LEN (1024*16)

/* 读取客户端请求 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// 客户端连接的配置信息
client *c = (client*) privdata;
// 设置读取大小
readlen = PROTO_IOBUF_LEN;
// 获取客户端已经读取过的大小
qblen = sdslen(c->querybuf);
...
// 扩展客户端请求buffer的大小
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 读取数据,追加在原有querybuf内容的后面
nread = read(fd, c->querybuf+qblen, readlen);
...
// 处理客户端请求buffer中的数据
processInputBufferAndReplicate(c);
}

// 处理Buffer
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
...
}
}

执行

processInputBuffer是用于解析客户端请求的Buffer并调用processCommand执行对应的操作,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* 处理缓存内的请求数据 */
void processInputBuffer(client *c) {
// 设置当前处理的客户端
server.current_client = c;
// 循环读取Buffer并处理
while(c->qb_pos < sdslen(c->querybuf)) {
...
// 根据不同的请求类型,尝试解析请求命令(数据完整),否则,进入下一次循环
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}

// 无参数,重置客户端
if (c->argc == 0) {
resetClient(c);
} else {
// 执行命令
if (processCommand(c) == C_OK) {
...
}
...
// 处理完毕清空当前处理客户端
server.current_client = NULL;
}

processInlineBufferprocessMultibulkBuffer是用于命令解析的,如果解析成功则执行命令,否则,进入下一轮循环读取剩余Buffer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/* 执行命令 */
int processCommand(client *c) {
// 处理quit命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
// 查询argv[0]对应的命令
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
...
/* 此处忽略了命令执行前的条件判断,命令的执行受主从、持久化等问题的影响 */
...
// 开始执行命令
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// MULTI 开启事务,命令入队列
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 直接执行命令
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}

/* 命令入队列 */
void queueMultiCommand(client *c) {
// 申请空间
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
// 设置命令
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
...
}

call是在没有事务的情况下,直接执行,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/* 执行命令 */
void call(client *c, int flags) {
...
// 通知监控
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
...
// 执行命令
c->cmd->proc(c);
...
// 记录慢日志
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
...
// 传播命令
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
// 传播命令到Slave和AOF
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
// 强制传播命令到Slaver和AOF
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
...
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
// 传播命令
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
...
}

/* 传播命令 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// 传播命令给AOF和Slaver
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

写入

addReply是用于给客户端返回执行结果,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* 请求返回 */
void addReply(client *c, robj *obj) {
// 判断是否需要给客户端返回数据
if (prepareClientToWrite(c) != C_OK) return;

if (sdsEncodedObject(obj)) {
// 尝试写入响应Buffer,Buffer无法存储的话写入响应链表
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
}
...
}

/* 判断是否需要给客户端返回数据*/
int prepareClientToWrite(client *c) {
// 过滤lua脚本
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
// 过滤不需要返回
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
// 过滤主线程
if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
...
/* client加入回写队列*/
if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
return C_OK;
}

clientInstallWriteHandler是客户端回写处理器,它仅会把写入命令缓存到写入队列中,并不会直接返回结果

1
2
3
4
5
6
7
8
/* 把客户端加入等待回写队列 */
void clientInstallWriteHandler(client *c) {
...
c->flags |= CLIENT_PENDING_WRITE;
// 加入到等待回写队列
listAddNodeHead(server.clients_pending_write,c);
...
}

此时,Redis内部对客户端的请求处理完毕,还没有发现哪里完成了最后结果的写入。

再回头看一下beforeSleep到底做了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* 每次循环一次执行一次 */
void beforeSleep(struct aeEventLoop *eventLoop) {
...
// flush到AOF
flushAppendOnlyFile(0);
// 处理等待回写的客户端
handleClientsWithPendingWrites();
...
}

/* 用于处理等待回复的客户端 */
int handleClientsWithPendingWrites(void) {
...
// 循环遍历回写队列
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
// 获取并移除客户端
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
...
// 尝试向客户端回写数据
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 如果未完成回写数据,则注册异步回写事件
if (clientHasPendingReplies(c)) {
...
// 创建写入事件
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
/* 回写客户端处理器 */
sendReplyToClient, c) == AE_ERR)
...
}

/* 回写客户端处理器 */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
...
/* 尝试向客户端回写数据 */
writeToClient(fd,privdata,1);
}

writeToClient是用于回写数据的处理函数,回写数据位于client->bufposclient->reply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* 回写数据 */
int writeToClient(int fd, client *c, int handler_installed) {
...
// 循环读取并回写数据
while(clientHasPendingReplies(c)) {
// 1. 检查缓冲区并回写
if (c->bufpos > 0) {
// 调用socket.write来写入数据
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
...
// 2. 检查reply并回写
} else {
o = listNodeValue(listFirst(c->reply));
...
// 同上,写入数据
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);

// 当前节点写入完成后删除
if (c->sentlen == objlen) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
...
}
}
// 当回写字节数超过限制,则暂停回写退出(注册回写事件异步触发)
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
...
return C_OK;
}

在看下afterloop到底做了什么,

1
2
3
4
5
6
7
8
9
10
void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
// 如果存在模块,则加全局锁
if (moduleCount()) moduleAcquireGIL();
}


void moduleAcquireGIL(void) {
pthread_mutex_lock(&moduleGIL);
}

加全局锁的目的是为了避免模块与Redis框架并发读写数据的问题

call命令会自动调用addReply进行数据的回写,

set命令为例,对应的函数为setGenericCommand

1
2
3
4
5
6
7
8
9
10
void setGenericCommand(redisClient *c, int flags, robj *key,
robj *val, robj *expire, int unit, robj *ok_reply,
robj *abort_reply) {
...
//将键值关联到数据库
setKey(c->db,key,val);
...
// 回写结果
addReply(c, ok_reply ? ok_reply : shared.ok);
}