Redis Server 连接管理 连接初始化 事件处理

连接初始化

接收连接

Redis 服务支持以下几种连接方式:

  • TCP
  • TLS
  • Unix Socket

收到不同方式的连接请求之后,Redis 会调用启动时绑定的处理函数。TCP 连接使用 acceptTcpHandler 处理,TLS 连接使用 acceptTLSHandler 处理,Unix socket 连接使用 acceptUnixHandler 处理。这几个处理函数最终都会调用通用函数 acceptCommonHandler

acceptCommonHandler 步骤

  1. 检查连接数,如果超过了配置的最大连接数会拒绝请求,并向客户端发送错误信息。最大连接数可以在 redis.conf 的 maxclients 参数修改。
  2. 创建客户端对象
  3. 保存 flag 参数,主要用于之后区分使用 Unix Socket 连接的客户端
  4. 调用客户端连接请求处理函数 clientAcceptHandler,主要用于处理默认保护模式下,拒绝处理没有设置密码的外部设备(非 localhost)连接请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
c复制代码// networking.c
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
UNUSED(ip);

// 1. 连接数检查
if (listLength(server.clients) >= server.maxclients) {
// 错误处理
// ...
return;
}

// 2. 创建客户端对象
if ((c = createClient(conn)) == NULL) {
// 错误处理
// ...
return;
}

// 3. 保存接收连接的相关参数
c->flags |= flags;

// 4. 调用 clientAcceptHandler 处理客户端连接请求
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
// 错误处理
// ...
return;
}
}

创建客户端对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
c复制代码client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));

// 连接初始化
if (conn) {
// 1. 将连接设为非阻塞模式
connNonBlock(conn);
// 2. 禁用 nagel 算法
connEnableTcpNoDelay(conn);
// 3. 设置 TCP keepalive
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);

// 4. 设置请求处理函数
connSetReadHandler(conn, readQueryFromClient);
// 5. 让 conn->private_data 指向 client 对象
connSetPrivateData(conn, c);
}
...

步骤:

  1. 将连接设为非阻塞模式。

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/59e60246228f4a4db9fd67bc3cbaf2e7~tplv-k3u1fbpfcp-zoom-1.image

如果程序 A 不断调用 send 将数据将数据拷贝到内核缓冲区,而应用程序 B 不调用 recv ,则 B 的内核缓冲区被填满后 A 的内核缓冲也会被填满,此时 A 继续调用 send 函数结果与 socket 模式有关

* 阻塞模式:继续调用 `send/recv` 时会阻塞在调用处
* 非阻塞模式:立即出错并退出,得到错误码 EWOULDBLOCK 或 EAGAIN
  1. 设置 TCP_NODELAY,禁用 nagle 算法,存放到内核缓冲区中的数据会立即发出。否则如果一次放到内核缓冲区中的数据数据包太小,则会在多个小的数据包凑成一个足够大的数据包后才会将数据发出。
  2. 设置 TCP keepalive,作用如下:
1. 检测因服务停止、网络波动、宕机、应用重启等原因挂掉的连接
2. 防止因为网络不活动而断连(使用NAT代理或者防火墙的时候,经常会出现这种问题)
3. TCP层面的心跳检测
  1. 将请求处理函数设为 readQueryFromClient ,用于解析和处理客户端发来的请求命令。
  2. conn->private_data 指向 client 对象,使 client 对象与 conn 对象相互引用
1
2
3
4
5
6
7
8
9
10
c复制代码		// 初始化 client 属性
// ...
//

// 6. 保存 client 对象
if (conn) linkClient(c);
// 7. 初始化 MULTI/EXEC 相关的参数
initClientMultiState(c);
return c;
}
  1. linkClient 保存 client 对象
1. 将 client 对象存到双向链表 `server.clients` 尾部的节点
2. 将 `server.clients` 尾部节点保存到 client 对象的 `client_list_node` 字段
3. 反转 client id 字节序,将转换后的 id 作为 key,client 对象作为 value,保存到基数树 `server.clients_index`。当后续需要通过 id 获取 client 对象时会(例如 `CLIENT UNBLOCK` 命令)从基数树中查询。反转 client id 字节序时使用 `memrev64` 函数,先将 64 位的 unsigned int 转换成 char\*,然后在 char\* 内部交换字符的顺序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
c复制代码void memrev64(void *p) {
unsigned char *x = p, t;

t = x[0];
x[0] = x[7];
x[7] = t;
t = x[1];
x[1] = x[6];
x[6] = t;
t = x[2];
x[2] = x[5];
x[5] = t;
t = x[3];
x[3] = x[4];
x[4] = t;
}

对于长整型数据的映射,利用基数树可以根据一个长整型快速查找到其对应的对象指针。避免了使用 hash 映射 hash 函数难以设计,不恰当的 hash 函数可能增大冲突,或浪费空间。

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/04cb1ab60dbd41deb14c43e4c6ae4553~tplv-k3u1fbpfcp-zoom-1.image

基数树可视化工具:www.cs.usfca.edu/~galles/vis…

执行 createClient 命令时支持传入 NULL,是因为 redis 中所有命令的执行都依赖一个 client 上下文,但是在 Lua 解释器中执行脚本等情况下并没有活跃的连接,因此需要用到 conn 为 NULL 的 client。

conn 为 NULL 的 client 不会被添加到 server.clientsserver.clients_index

事件处理

读事件

当一个新的客户端连接到服务器时, 服务器会给为该客户端绑定读事件, 直到客户端断开连接之后, 这个读事件才会被移除。

  • 当客户端只是连接到服务器,但并没有向服务器发送命令时,读事件就处于等待状态
  • 当客户端给服务器发送命令请求,并且请求已到达时,该客户端的读事件处于就绪状态

写事件

当服务器有命令结果要传回给客户端时, 会为客户端关联写事件, 在命令结果传送完毕之后, 客户端和写事件的关联就会被移除。

  • 当服务器有命令结果需要返回给客户端,但客户端还未能执行无阻塞写,那么写事件处于等待状态
  • 当服务器有命令结果需要返回给客户端,并且客户端可以进行无阻塞写,那么写事件处于就绪状态

当出现读事件和写事件同时就绪的情况时, 优先处理读事件

client 对象与事件循环

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/138e697f0c6b429d9b315ae6f1ff1fa3~tplv-k3u1fbpfcp-zoom-1.image

Redis server 启动时,会在全局对象 aeEventLoop 中使用 eventsfired 两个字段保存了事件相关的对象:

  • events:保存注册的事件
  • fired:保存触发的事件

另外当事件的到来,就将所有就绪的事件从内核事件表中复制到 apidata→events

events, fired, apidata→events 数组的大小相同,下标是 clientfd

当 clientfd 的读事件触发后,redis server 执行 connSocketSetReadHandler 函数,然后触发 aeCreateFileEvent 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
c复制代码// 读事件调用 aeCreateFileEvent 时参数依次为 
// * 全局 eventLoop 对象
// * client fd
// * AE_READABLE
// * connSocketEventHandler 函数
// * connection 对象
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// ...

// 读取 redis server 初始化时预留的空 aeFileEvent
aeFileEvent *fe = &eventLoop->events[fd];

// aeApiAddEvent 会根据 OS 分别调用 select, epoll, kqueue, evport 4 种实现
// 其中会生成一个 epoll_event 对象 ee,将 EPOLLIN 添加到 ee.events,并将 clientfd
// 添加到 ee.data.fd
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 设置读事件的回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

epoll 的 aeApiAddEvent 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
c复制代码static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;

// 判断是新增还是修改
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;

// 保留原有的 mask
mask |= eventLoop->events[fd].mask;

if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

// 将 clientfd 添加到 epoll_event 的 data
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

事件循环中aeApiPoll 调用 epoll_waitepoll_wait 将触发的事件复制到 apidate->events ,然后由 aeApiPoll 中的逻辑将本次触发事件的序号作为数组下标,将 fd、事件掩码记录到 eventLoop->fired 数组对应的位置上。

aeProcessEvents 在 aeApiPoll 返回后遍历 eventLoop->fired 数组,取出有效的数组元素,得到有事件的 fd 和事件掩码 mask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
c复制代码static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
// tvp 是后续定时任务允许等待的最大事件

aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

// epoll_wait 将所有就绪的事件从内核事件表中复制到 state->events
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

if (retval > 0) {
int j;

numevents = retval;
// 将触发的事件复制到 eventLoop->fired
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0fd48f5b01d64dc5be888360ae7dfdf9~tplv-k3u1fbpfcp-zoom-1.image

clientfd 为下标从 eventLoop->events 中取出 aeFileEvent 对象,然后通过 aeFileEvent 的 clientData 取出 connection 对象,进而通过 connection 对象的 private_data 得到 client 对象,用于后续处理。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%