Redis网络连接层
Redis取自Remote Dictionary Server,顾名思义,Redis是运行在网络环境之上的。Redis目前支持3种网络连接类型:
- TCP:默认监听TCP 6379端口,接收网络请求,提供服务。
- Unix Socket:可以用作测试,以及使用Unix Socket做配置变更等。
- TLS:使用TLS加密的网络连接,可以防止网络链路上被监听、劫持,更加安全。不过代价就是性能有损失。
Redis通过这3种网络连接类型的支持,满足了绝大多数的用户需求,成为了目前最流行的KV存储数据库。
过去
截至2022-Q3,Redis最新的版本是7.0.5。在当前版本中,使用了“传统”的网络连接管理方式。在redis/src/server.c中监听端口:
/* Open the TCP listening socket for the user commands. */ | |
if (server.port != 0 && | |
listenToPort(server.port,&server.ipfd) == C_ERR) { | |
/* Note: the following log text is matched by the test suite. */ | |
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port); | |
exit(1); | |
} | |
if (server.tls_port != 0 && | |
listenToPort(server.tls_port,&server.tlsfd) == C_ERR) { | |
/* Note: the following log text is matched by the test suite. */ | |
serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port); | |
exit(1); | |
} | |
/* Open the listening Unix domain socket. */ | |
if (server.unixsocket != NULL) { | |
unlink(server.unixsocket); /* don't care if this fails */ | |
server.sofd = anetUnixServer(server.neterr,server.unixsocket, | |
(mode_t)server.unixsocketperm, server.tcp_backlog); | |
if (server.sofd == ANET_ERR) { | |
serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr); | |
exit(1); | |
} | |
anetNonBlock(NULL,server.sofd); | |
anetCloexec(server.sofd); | |
} |
以及设置监听文件描述符的处理函数,开始提供网络服务:
/* Create an event handler for accepting new connections in TCP and Unix | |
* domain sockets. */ | |
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { | |
serverPanic("Unrecoverable error creating TCP socket accept handler."); | |
} | |
if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) { | |
serverPanic("Unrecoverable error creating TLS socket accept handler."); | |
} | |
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, | |
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); |
从代码中,我们可以清晰地看到这几种连接类型的初始化过程和配置参数等。但是它的代价是:
- 不能扩展:如果想要支持一个新的连接类型,那么势必要修改代码。事实上,需要修改的代码还包含另外的几处。
- 宏的引用:因为TLS不是操作系统默认支持,需要依赖编译选项控制,那么存在TLS是否支持两种情况,就需要使用宏控制。在Redis的代码中存在多处#ifdef USE_OPENSSL的使用。
- 代码的可维护性降低:在server.c中,不得不引用、调用TCP、TLS和Unix Socket相关的代码逻辑。
现状
连接层框架
截至2022-Q3,在Redis最新的开发分支上,支持了连接层框架(connection layer framework),它长成这样:
uplayer | |
| | |
connection layer | |
/ | \ | |
TCP Unix TLS | |
connection layer负责抽象连接类型,它要求每种连接类型具有如下的方法:
typedef struct ConnectionType { | |
/* connection type */ | |
const char *(*get_type)(struct connection *conn); | |
/* connection type initialize & finalize & configure */ | |
void (*init)(void); /* auto-call during register */ | |
void (*cleanup)(void); | |
int (*configure)(void *priv, int reconfigure); | |
/* ae & accept & listen & error & address handler */ | |
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask); | |
aeFileProc *accept_handler; | |
int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote); | |
int (*listen)(connListener *listener); | |
/* create/close connection */ | |
connection* (*conn_create)(void); | |
connection* (*conn_create_accepted)(int fd, void *priv); | |
void (*close)(struct connection *conn); | |
/* connect & accept */ | |
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); | |
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); | |
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); | |
/* IO */ | |
int (*write)(struct connection *conn, const void *data, size_t data_len); | |
int (*writev)(struct connection *conn, const struct iovec *iov, int iovcnt); | |
int (*read)(struct connection *conn, void *buf, size_t buf_len); | |
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); | |
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); | |
const char *(*get_last_error)(struct connection *conn); | |
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout); | |
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); | |
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); | |
/* pending data */ | |
int (*has_pending_data)(void); | |
int (*process_pending_data)(void); | |
/* TLS specified methods */ | |
sds (*get_peer_cert)(struct connection *conn); | |
} ConnectionType; |
上层(uplayer)通过connection layer访问Redis的各个连接类型,则可以忽略连接类型的具体实现,仅仅需要调用各个方法即可。
同时,connection layer还负责管理各个连接类型,例如一个新连接类型在使用之前,需要向Redis进行注册,参考redis/src/connection.c:
int connTypeRegister(ConnectionType *ct) { | |
const char *typename = ct->get_type(NULL); | |
ConnectionType *tmpct; | |
int type; | |
/* find an empty slot to store the new connection type */ | |
for (type = 0; type < CONN_TYPE_MAX; type++) { | |
tmpct = connTypes[type]; | |
if (!tmpct) | |
break; | |
/* ignore case, we really don't care "tls"/"TLS" */ | |
if (!strcasecmp(typename, tmpct->get_type(NULL))) { | |
serverLog(LL_WARNING, "Connection types %s already registered", typename); | |
return C_ERR; | |
} | |
} | |
serverLog(LL_VERBOSE, "Connection type %s registered", typename); | |
connTypes[type] = ct; | |
if (ct->init) { | |
ct->init(); | |
} | |
return C_OK; | |
} |
基于此,在redis/src/server.c监听各个连接类型则变成:
/* create all the configured listener, and add handler to start to accept */ | |
int listen_fds = 0; | |
for (int j = 0; j < CONN_TYPE_MAX; j++) { | |
listener = &server.listeners[j]; | |
if (listener->ct == NULL) | |
continue; | |
if (connListen(listener) == C_ERR) { | |
serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL)); | |
exit(1); | |
} | |
if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK) | |
serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL)); | |
listen_fds += listener->count; | |
} |
动态加载连接类型
在过去的版本中,需要在Redis编译时决定是否支持TLS。得益于新的连接层框架,Redis支持:
- 不支持TLS。
- 静态支持TLS:make BUILD_TLS=yes,代价是redis-server始终需要链接libssl和libcrypto,尽管可能不运行。
- 动态支持TLS:make BUILD_TLS=module即可把TLS支持编译成为redis-tls.so。如果希望使用TLS,通过redis-server --loadmodule src/redis-tls.so即可动态加载TLS,达到了“运行时加载”的效果。
同时,在代码结构上,也带来了一定的收益:几乎移除掉#ifdef USE_OPENSSL,仅在redis/src/tls.c中使用,同时重载ConnectionType:
static ConnectionType CT_TLS = { | |
/* connection type */ | |
.get_type = connTLSGetType, | |
/* connection type initialize & finalize & configure */ | |
.init = tlsInit, | |
.cleanup = tlsCleanup, | |
.configure = tlsConfigure, | |
/* ae & accept & listen & error & address handler */ | |
.ae_handler = tlsEventHandler, | |
.accept_handler = tlsAcceptHandler, | |
.addr = connTLSAddr, | |
.listen = connTLSListen, | |
/* create/close connection */ | |
.conn_create = connCreateTLS, | |
.conn_create_accepted = connCreateAcceptedTLS, | |
.close = connTLSClose, | |
/* connect & accept */ | |
.connect = connTLSConnect, | |
.blocking_connect = connTLSBlockingConnect, | |
.accept = connTLSAccept, | |
/* IO */ | |
.read = connTLSRead, | |
.write = connTLSWrite, | |
.writev = connTLSWritev, | |
.set_write_handler = connTLSSetWriteHandler, | |
.set_read_handler = connTLSSetReadHandler, | |
.get_last_error = connTLSGetLastError, | |
.sync_write = connTLSSyncWrite, | |
.sync_read = connTLSSyncRead, | |
.sync_readline = connTLSSyncReadLine, | |
/* pending data */ | |
.has_pending_data = tlsHasPendingData, | |
.process_pending_data = tlsProcessPendingData, | |
/* TLS specified methods */ | |
.get_peer_cert = connTLSGetPeerCert, | |
}; |
以及在Redis Module的入口函数中,执行connTypeRegister向Redis注册新的连接类型:
int RedisModule_OnLoad(void *ctx, RedisModuleString **argv, int argc) { | |
UNUSED(argv); | |
UNUSED(argc); | |
/* Connection modules must be part of the same build as redis. */ | |
if (strcmp(REDIS_BUILD_ID_RAW, redisBuildIdRaw())) { | |
serverLog(LL_NOTICE, "Connection type %s was not built together with the redis-server used.", CONN_TYPE_TLS); | |
return REDISMODULE_ERR; | |
} | |
if (RedisModule_Init(ctx,"tls",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) | |
return REDISMODULE_ERR; | |
/* Connection modules is available only bootup. */ | |
if ((RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_SERVER_STARTUP) == 0) { | |
serverLog(LL_NOTICE, "Connection type %s can be loaded only during bootup", CONN_TYPE_TLS); | |
return REDISMODULE_ERR; | |
} | |
RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD); | |
if(connTypeRegister(&CT_TLS) != C_OK) | |
return REDISMODULE_ERR; | |
return REDISMODULE_OK; | |
} |
通过Redis Module机制,以及连接层的抽象和框架扩展能力,让Redis的连接类型支持更加易用、可扩展。
重写的Unix Socket连接类型
尽管Unix Socket和TCP是完全不同的连接类型,但是二者具有很大的相似性:基于一个FD(文件描述符)即可操作;支持read、write、writev等IO操作。于是Redis在代码中谨慎地判断TCP/Unix Socket,最大程度上复用了TCP的函数。
基于新的连接类型框框架,把Unix Socket支持从TCP中剥离出来,让代码拥有更好的维护性,参考redis/src/unix.c:
/* ========================================================================== | |
* unix.c - unix socket connection implementation | |
* -------------------------------------------------------------------------- | |
* Copyright (C) 2022 zhenwei pi | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a | |
* copy of this software and associated documentation files (the | |
* "Software"), to deal in the Software without restriction, including | |
* without limitation the rights to use, copy, modify, merge, publish, | |
* distribute, sublicense, and/or sell copies of the Software, and to permit | |
* persons to whom the Software is furnished to do so, subject to the | |
* following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included | |
* in all copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | |
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | |
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | |
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | |
* USE OR OTHER DEALINGS IN THE SOFTWARE. | |
* ========================================================================== | |
*/ | |
static ConnectionType CT_Unix; | |
static const char *connUnixGetType(connection *conn) { | |
UNUSED(conn); | |
return CONN_TYPE_UNIX; | |
} | |
static void connUnixEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) { | |
connectionTypeTcp()->ae_handler(el, fd, clientData, mask); | |
} | |
static int connUnixAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { | |
return connectionTypeTcp()->addr(conn, ip, ip_len, port, remote); | |
} | |
static int connUnixListen(connListener *listener) { | |
int fd; | |
mode_t *perm = (mode_t *)listener->priv; | |
if (listener->bindaddr_count == 0) | |
return C_OK; | |
/* currently listener->bindaddr_count is always 1, we still use a loop here in case Redis supports multi Unix socket in the future */ | |
for (int j = 0; j < listener->bindaddr_count; j++) { | |
char *addr = listener->bindaddr[j]; | |
unlink(addr); /* don't care if this fails */ | |
fd = anetUnixServer(server.neterr, addr, *perm, server.tcp_backlog); | |
if (fd == ANET_ERR) { | |
serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr); | |
exit(1); | |
} | |
anetNonBlock(NULL, fd); | |
anetCloexec(fd); | |
listener->fd[listener->count++] = fd; | |
} | |
return C_OK; | |
} | |
static connection *connCreateUnix(void) { | |
connection *conn = zcalloc(sizeof(connection)); | |
conn->type = &CT_Unix; | |
conn->fd = -1; | |
return conn; | |
} | |
static connection *connCreateAcceptedUnix(int fd, void *priv) { | |
UNUSED(priv); | |
connection *conn = connCreateUnix(); | |
conn->fd = fd; | |
conn->state = CONN_STATE_ACCEPTING; | |
return conn; | |
} | |
static void connUnixAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { | |
int cfd, max = MAX_ACCEPTS_PER_CALL; | |
UNUSED(el); | |
UNUSED(mask); | |
UNUSED(privdata); | |
while(max--) { | |
cfd = anetUnixAccept(server.neterr, fd); | |
if (cfd == ANET_ERR) { | |
if (errno != EWOULDBLOCK) | |
serverLog(LL_WARNING, | |
"Accepting client connection: %s", server.neterr); | |
return; | |
} | |
serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); | |
acceptCommonHandler(connCreateAcceptedUnix(cfd, NULL),CLIENT_UNIX_SOCKET,NULL); | |
} | |
} | |
static void connUnixClose(connection *conn) { | |
connectionTypeTcp()->close(conn); | |
} | |
static int connUnixAccept(connection *conn, ConnectionCallbackFunc accept_handler) { | |
return connectionTypeTcp()->accept(conn, accept_handler); | |
} | |
static int connUnixWrite(connection *conn, const void *data, size_t data_len) { | |
return connectionTypeTcp()->write(conn, data, data_len); | |
} | |
static int connUnixWritev(connection *conn, const struct iovec *iov, int iovcnt) { | |
return connectionTypeTcp()->writev(conn, iov, iovcnt); | |
} | |
static int connUnixRead(connection *conn, void *buf, size_t buf_len) { | |
return connectionTypeTcp()->read(conn, buf, buf_len); | |
} | |
static int connUnixSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { | |
return connectionTypeTcp()->set_write_handler(conn, func, barrier); | |
} | |
static int connUnixSetReadHandler(connection *conn, ConnectionCallbackFunc func) { | |
return connectionTypeTcp()->set_read_handler(conn, func); | |
} | |
static const char *connUnixGetLastError(connection *conn) { | |
return strerror(conn->last_errno); | |
} | |
static ssize_t connUnixSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) { | |
return syncWrite(conn->fd, ptr, size, timeout); | |
} | |
static ssize_t connUnixSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) { | |
return syncRead(conn->fd, ptr, size, timeout); | |
} | |
static ssize_t connUnixSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) { | |
return syncReadLine(conn->fd, ptr, size, timeout); | |
} | |
static ConnectionType CT_Unix = { | |
/* connection type */ | |
.get_type = connUnixGetType, | |
/* connection type initialize & finalize & configure */ | |
.init = NULL, | |
.cleanup = NULL, | |
.configure = NULL, | |
/* ae & accept & listen & error & address handler */ | |
.ae_handler = connUnixEventHandler, | |
.accept_handler = connUnixAcceptHandler, | |
.addr = connUnixAddr, | |
.listen = connUnixListen, | |
/* create/close connection */ | |
.conn_create = connCreateUnix, | |
.conn_create_accepted = connCreateAcceptedUnix, | |
.close = connUnixClose, | |
/* connect & accept */ | |
.connect = NULL, | |
.blocking_connect = NULL, | |
.accept = connUnixAccept, | |
/* IO */ | |
.write = connUnixWrite, | |
.writev = connUnixWritev, | |
.read = connUnixRead, | |
.set_write_handler = connUnixSetWriteHandler, | |
.set_read_handler = connUnixSetReadHandler, | |
.get_last_error = connUnixGetLastError, | |
.sync_write = connUnixSyncWrite, | |
.sync_read = connUnixSyncRead, | |
.sync_readline = connUnixSyncReadLine, | |
/* pending data */ | |
.has_pending_data = NULL, | |
.process_pending_data = NULL, | |
}; | |
int RedisRegisterConnectionTypeUnix() | |
{ | |
return connTypeRegister(&CT_Unix); | |
} |
由于Unix Socket实现较为简单,且复用了大量的TCP连接代码,unix.c中仅使用了少量的代码实现,从中依然可以窥探一个连接类型具有的基本属性:
- 重载ConnectionType连接类型。
- 连接类型变量和重载函数为static类型,对外不做任何暴露。
- 向连接层注册。
- 事实上,也可以把Unix Socket支持编译成为一个动态链接库,以loadmodule的方式动态加载。Redis的Maintainer Oran认为Unix Socket是一个基础的连接类型,不需要额外的链接和宏控制,因此始终使用静态编译支持。
展望
得益于Redis连接层框架和Module机制,向Redis中增加一个新的连接类型变得更加容易。RDMA是一种高性能的网络技术,iWARP和RoCE v2也让数据中心的以太网络支持了RDMA,近年来也变得更加流行。因此,是不是可以让Redis跑在RDMA上呢?在测试中,在1KB的KV情况下,Redis Over RDMA技术让Redis单核性能达到~450K QPS,大约是相同环境下的TCP性能的2.5倍(~180K QPS)。目前Redis Over RDMA的Pull Request正在社区接受Review,也欢迎提建议、捉BUG。