目录

Redis 事件的实现 · Analyze

对 Redis 数据库的源码阅读,当前版本为 Redis 6.0 RC1,参考书籍《Redis 设计与实现》及其注释。项目地址:github.com/wingsxdu

Redis 采用事件驱动机制来处理大量的 I/O 操作,并实现了自己的事件驱动库,基于 Reactor 模式开发了自己的网络事件处理器,使用 I/O 多路复用程序监听多个 Socket,以处理大量的事件。

Redis 的事件程序处理以下两类事件:

  • 文件事件:用于处理 Redis 服务器和客户端之间的网络 I/O,实现对客户端的响应;
  • 时间事件:处理 Redis 服务中的一些定时操作,例如serverCronclientCron定时程序。

事件驱动库的实现主要在ae.c源文件中,感兴趣的读者也可以阅读一下。

事件模型

Reactor 模式

Redis 的文件事件处理器是基于 Reactor 模式 实现的,每一个网络连接都对应一个文件描述符(FD),事件处理器使用 I/O 多路复用程序,同时监听多个 FD,实现了同时对多个 FD 读写的监控,提高了网络通信模型的性能。当 acceptreadwriteclose 文件事件发生时,就会调用对应的事件处理器进行处理。

虽然文件事件可能是并发发生的,但是 I/O 多路复用程序会将所有的事件放到执行队列里,并由事件分派器分派给对应的事件处理器(执行函数)。保证了 Redis 服务以单线程的方式运行。

/posts/database/redis/event/index.assets/image-20200211160327831.png

图源《Redis 设计与实现》

I/O 多路复用

Redis 是以单线程模式运行的,所有的操作都是线性顺序执行,但是 I/O 操作往往阻塞性的,这可能会导致某个客户端的 I/O 阻塞使得 Redis 程序无法对其它客户端继续提供服务,因此 Redis 事件使用 I/O 多路复用来解决这个问题。

Redis 基于 I/O 多路复用实现了一个简单的事件驱动库,这个库实际上是对不同系统平台下的 I/O 复用函数进行了包装,Redis 会优先选择时间复杂度为 O(1) 的复用函数作为底层实现,包括 Solaries 10 中的 evport、Linux 中的 epoll 和 macOS/FreeBSD 中的 kqueue,它们都使用内部内核空间内存结构,能够服务数十万的文件描述符。

如果没有上述函数,Redis 会选择使用跨平台的 select 函数,但select只能服务 1024 个描述符,而且每次调用 select都需要把 FD 集合从用户态拷贝到内核态,同时在内核遍历传递进来的所有 FD,所以复杂度为 O(n),这个开销在 FD 很多时会很大,性能损失较大。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

以 Linux 中的 epoll 为例,查看事件驱动库是如何对 I/O 复用函数进行封装的。

创建一个新的epoll实例,是对 epoll.epoll_create() 进行封装:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    // 初始化事件槽空间
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    // 创建 epoll 实例
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    // 赋值给 eventLoop
    eventLoop->apidata = state;
    return 0;
}

eventLoop中添加或删除需要监控的 FD 以及监听的事件,都是对 epoll.epoll_ctl() 进行封装:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

获取可执行事件,是对 epoll.epoll_wait()进行封装:

1
2
3
4
5
6
7
8
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
	...
}

通俗地将,Redis 在不同操作系统平台上,充分利用了内核提供的函数,以达成性能优化,并简化了自身实现。

事件分发器

Redis 的事件分派器 aeProcessEvents 会检查是否有事件需要执行,将文件事件和时间事件分发给对应的事件处理器,分派器会根据传入的flags执行不同的处理不同的事件:

  • 如果 flags 为 0,那么函数执行任何操作,直接返回;
  • 如果 flags 包含AE_ALL_EVENTS,那么所有类型的事件都会被处理;
  • 如果 flags 包含AE_FILE_EVENTS,那么处理文件事件;
  • 如果 flags 包含AE_TIME_EVENTS,那么处理时间事件;
  • 如果 flags 包含AE_DONT_WAIT,那么函数在处理完所有不许阻塞的事件之后,立即返回;
  • 如果 flags 包含AE_CALL_AFTER_SLEEP,将会调用aftersleep函数。

如果需要同时处理两种事件,aeProcessEvents 会先计算最近的时间事件发生所需要等待的时间,然后调用 aeApiPoll 方法等待时间事件的发生,如果在这段时间中有文件事件需要处理,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件触发。

如果被分派的是时间事件,将会调用子函数processTimeEvents()对时间事件进行处理。

如果被分派的是文件事件,事件分派器 aeProcessEvents 会根据被监听的文件事件的类型掩码mask将其分发给读/写事件处理器,确保读/写事件只能执行其中一个。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

事件的执行

文件事件

文件事件是 Redis 能够响应客户端的基石,命令的执行也依赖于此,一个文件时间的数据结构定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
typedef struct aeFileEvent {
    // 监听事件类型掩码
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    // 读事件处理器
    aeFileProc *rfileProc;
    // 写事件处理器
    aeFileProc *wfileProc;
    // 多路复用库的私有数据
    void *clientData;
} aeFileEvent;

函数aeCreateFileEvent()会创建一个文件事件,将该事件注册到 I/O 多路复用程序中。例如我们需要从主服务器同步数据时,会发起一个 socekt 连接,然后调用aeCreateFileEvent函数注册对应的事件处理器,将syncWithMaster函数添加到读事件处理器中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    // 将该事件注册到 I/O 多路复用程序中
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

// 注册同步事件
aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);

当这个同步事件满足条件时,就会由事件分派器处理交给读事件处理器处理,执行数据同步操作。

时间事件

Reids 有很多操作需要定时进行处理,时间事件就是对这类定时任务的抽象。时间事件的数据结构是一个双向链表,便于遍历:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
typedef struct aeTimeEvent {
    // 时间事件的唯一标识符
    long long id; /* time event identifier. */
    // 事件的到达时间
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    // 事件处理函数
    aeTimeProc *timeProc;
    // 事件释放函数
    aeEventFinalizerProc *finalizerProc;
    // 多路复用库的私有数据
    void *clientData;
    // 指向上/下个时间事件结构,形成链表
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

Redis 的时间事件分为以下两类:

  • 定时事件:在指定的时间执行一次;
  • 周期性事件:每隔指定的时间间隔就执行一次。

时间的处理由processTimeEvents()函数执行,流程如下:

  1. 记录最新一次执行这个函数的时间,防止系统时间被修改产生事件处理混乱问题;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
        if (now < eventLoop->lastTime) {
            te = eventLoop->timeEventHead;
            while(te) {
                te->when_sec = 0;
                te = te->next;
            }
        }
        // 更新最后一次处理时间事件的时间
        eventLoop->lastTime = now;
    
  2. 遍历链表找出所有when_secwhen_ms小于现在时间的事件,执行事件对应的处理函数;

  3. 检查事件类型,如果是周期事件则刷新该事件下一次的执行事件,否则从列表中删除事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
        aeGetTime(&now_sec, &now_ms);
        // 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            // 执行事件处理器,并获取返回值
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            // 是否需要循环执行这个事件时间
            if (retval != AE_NOMORE) {
                // 需要, retval 毫秒之后继续执行这个时间事件
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // 不需要,将这个事件删除
                te->id = AE_DELETED_EVENT_ID;
            }
        }

循环检查

当我们创建一个 Redis 实例,执行各种初始化任务之后,main函数就会调用aeMain()函数,执行事件处理器的主循环。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
int main(int argc, char **argv) {
    ...
    aeMain(server.el);
}
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

aeMain 函数会陷入 while 循环,不断执行aeProcessEvents检查是否有触发的事件需要执行,这就使得 Redis 能够及时调用资源处理任务,而不会发生有事件需要处理而程序却无动于衷的情况,使得 Redis 即使是单线程执行的程序仍然保证了高性能。

总结

Redis 的事件实现使用了 Reactor 模式、I/O 多路复用、循环检查等策略,充分利用了系统提供的 API,简化了自身实现。虽然事件的执行仍然是串行的,但上述策略使得 Redis 在单线程模式下仍然保持了高性能,

Reference