说明:
- 此次案例采用的redis是cluster模式。
- 网络模型采用 epoll 模式
本篇文章主要讲解 ,从redis原理的角度了解一个 set 命令从redis client发出到 redis server端接收到客户端请求的时候,到底经历了哪些过程?
同样会附带了解下面几个问题
- redis的执行原理
- Redis cluster集群模式的运行原理
- 同样解释了为什么redis的速度
- epoll网络模型
为了了解redis请求流程,首先先了解下redis的网络模型。redis 支持 4中网络模式, select、poll、epoll、kqueue ,其中epoll 模型我个人认为是应用最广泛的模型,所以本篇文章以epoll 模型为 demo 进行讲解。
Epoll网络模型
Select 和 poll 模型的缺点:
- 每次调用 Select 都需要将进程加入到所有监视 Socket 的等待队列,每次唤醒都需要从每个队列中移除,这里涉及了两次遍历,而且每次都要将整个 FDS 列表传递给内核,牵涉到用户态到内核态的转移,有一定的开销。
- select /poll 返回的值是 int 类型,使得我们不知道是那个 socket 准备就绪了,我们还需要新一轮的系统调用去检查哪一个准备就绪了。
Epoll 模型为了解决 Select ,Poll的两次轮训和每次都需要传入文件描述符的问题,对整体的结构做了一个新的优化,具体架构如下:
Epoll 启动具体流程如下:
- 在内核中开辟一个新的存储空间,存储文件描述符(红黑树结构),构建方法是 epoll_create()
- 使用 epoll_ctl 函数,对文件描述符进行CRUD的管理
- 使用 epoll_wait 函数阻塞线程调用,同样把调用线程放到等待队列中
Epoll 收到消息后处理流程:
不同于 select/poll 的中断和异常处理,Epoll 采用的是内核通过调度机制,将等待事件的线程从挂起状态移动到可运行状态。
在 epoll 的等待过程中,内核会监视所有被注册的文件描述符,一旦有文件描述符上发生了注册的事件,内核会将这个事件通知到 epoll 实例。具体流程如下:
- 调用 epoll_wait 的线程在 epoll 实例上等待事件的发生。这时线程被挂起,进入休眠状态。
- 当有文件描述符上发生了注册的事件,内核会将这个事件信息标记到 epoll 实例中。
- 一旦事件发生,内核会唤醒等待的线程。这是通过调度机制完成的,内核会将等待的线程移动到可运行状态。
- 等待的线程被唤醒后,epoll_wait 返回,并将事件的信息填充到用户提供的数组中,使用户程序得以处理发生的事件。
过程伪代码如下:
// 用户空间代码 | |
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) { | |
// 在内核中等待事件发生 | |
wait_for_events(epfd, events, maxevents, timeout); | |
// 返回事件信息 | |
return num_events; | |
} | |
// 内核空间代码 | |
void wait_for_events(int epfd, struct epoll_event *events, int maxevents, int timeout) { | |
// 如果没有事件发生,将当前线程挂起 | |
add_thread_to_wait_queue(current_thread, epfd->wait_queue); | |
// 进入调度器,切换到其他线程执行 | |
schedule(); | |
// 返回时,说明事件发生,处理事件 | |
process_events(epfd, events, maxevents); | |
} | |
// 文件描述符事件发生时的处理 | |
void handle_events(struct epoll_event *events, int num_events) { | |
// 遍历等待队列,唤醒等待的线程 | |
wake_up_threads(epfd->wait_queue); | |
} |
Redis server端启动
在了解完 epoll 模型的时候,那我们需要思考,在redis中是如何利用Epoll模型通信的。我们看下redis 核心启动的源码:
int main(int argc, char **argv) { | |
//... | |
initServer(); | |
//... | |
aeMain(server.el); | |
} |
redis在启动时,有两个主要的方法,initServer 和 aeMain,其中 initServer 会有以下和epoll相关的核心流程:
- aeCreateEventLoop 创建 epoll的文件监控文件描述符列表
- listenToPort 监听指定端口
- createSocketAcceptHandler 注册对应接收事件的handler
- aeSetBeaforeSleepProc 前置处理器
aeMain 函数循环调用 aeApiPoll (相当于 epoll_wait)等待 FD 就绪。总体流程如下:
命令发送和执行
Redis Cluser 集群模式
Redis 集群模式是常用的架构模式,其结构图如下:
在集群中 master 节点同步采用的 Gossip协议进行通信,保证集群内消息通信。
在 master 和 slave 同步采用定时发送数据完成。
经过上面的讨论,把Redis 相关的背景知识进行了梳理,下面开始看命令的流转。
客户端连接
当redis启动时候,Redis 已经注册了链接应答管理器(tcpAccepthandler),这个作用主要是把就绪的 fd 绑定到对应的处理器上面(readQueryFromClient),这样当FD有数据就是的时候,可以调用对应的处理器方法。
void initServer(void) { | |
//... | |
createSocketAcceptHandler(&server.ipfd, acceptTcpHandler); | |
//... | |
} | |
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { | |
//... | |
while(max--) { | |
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); | |
if (cfd == ANET_ERR) { | |
if (errno != EWOULDBLOCK) | |
serverLog(LL_WARNING, | |
"Accepting client connection: %s", server.neterr); | |
return; | |
} | |
anetCloexec(cfd); | |
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); | |
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); | |
} | |
} | |
static void acceptCommonHandler(connection *conn, int flags, char *ip) { | |
//... | |
/* Create connection and client */ | |
if ((c = createClient(conn)) == NULL) { | |
serverLog(LL_WARNING, | |
"Error registering fd event for the new client: %s (conn: %s)", | |
connGetLastError(conn), | |
connGetInfo(conn, conninfo, sizeof(conninfo))); | |
connClose(conn); /* May be already closed, just ignore errors */ | |
return; | |
} | |
//... | |
} | |
client *createClient(connection *conn) { | |
client *c = zmalloc(sizeof(client)); | |
/* passing NULL as conn it is possible to create a non connected client. | |
* This is useful since all the commands needs to be executed | |
* in the context of a client. When commands are executed in other | |
* contexts (for instance a Lua script) we need a non connected client. */ | |
if (conn) { | |
//调用 readQueryFromClient | |
connSetReadHandler(conn, readQueryFromClient); | |
connSetPrivateData(conn, c); | |
} | |
//... | |
} |
当注册完成后,在aeMain方法中会调用 epoll_wait() 方法,具体代码流程如下:
void aeMain(aeEventLoop *eventLoop) { | |
eventLoop->stop = 0; | |
while (!eventLoop->stop) { | |
aeProcessEvents(eventLoop, AE_ALL_EVENTS| | |
AE_CALL_BEFORE_SLEEP| | |
AE_CALL_AFTER_SLEEP); | |
} | |
} | |
int aeProcessEvents(aeEventLoop *eventLoop, int flags){ | |
//... | |
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) | |
eventLoop->beforesleep(eventLoop); | |
/* Call the multiplexing API, will return only on timeout or when | |
* some event fires. */ | |
numevents = aeApiPoll(eventLoop, tvp); | |
/* After sleep callback. */ | |
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) | |
eventLoop->aftersleep(eventLoop); | |
//... | |
} | |
// ae_epoll.c | |
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { | |
aeApiState *state = eventLoop->apidata; | |
int retval, numevents = 0; | |
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, | |
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1); | |
if (retval > 0) { | |
int j; | |
numevents = retval; | |
for (j = 0; j < numevents; j++) { | |
int mask = 0; | |
struct epoll_event *e = state->events+j; | |
if (e->events & EPOLLIN) mask |= AE_READABLE; | |
if (e->events & EPOLLOUT) mask |= AE_WRITABLE; | |
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; | |
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; | |
eventLoop->fired[j].fd = e->data.fd; | |
eventLoop->fired[j].mask = mask; | |
} | |
} | |
return numevents; | |
} |
命令执行
当在redis 客户端输入 set xxx aaa 这个命令后,会经历下面几个过程:
- 当 set 命令从客户端发出的时候,通过提前建立好的TCP链接,把数据发送到某一台服务器上
- 当前redis节点检测当前的这个key是否在自己服务的Hash槽中,如果不在则直接返回一个moved命令,客户端接收到moved命令,转移到指定正确的服务器中。
- 客户端把输入的命令解析和转化成 RESP协议 +SET xxx aaa\r\n
- 客户端把报文发送到 Redis 服务端,当 socket 变成可读的时候,epoll_wait 返回了就绪的fd个数
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
- 循环遍历 fd 的个数,判断类型。此处这里是 EPOLLIN 事件,代表缓冲区已经可读,调用对应的函数(readQueryFromClient),具体代码如下:
void readQueryFromClient(connection *conn) { | |
//... | |
/* There is more data in the client input buffer, continue parsing it | |
* in case to check if there is a full command to execute. */ | |
processInputBuffer(c); | |
} | |
void processInputBuffer(client *c) { | |
/* Keep processing while there is something in the input buffer */ | |
while(c->qb_pos < sdslen(c->querybuf)) { | |
// | |
//... | |
/* Multibulk processing could see a <= 0 length. */ | |
if (c->argc == 0) { | |
resetClient(c); | |
} else { | |
/* If we are in the context of an I/O thread, we can't really | |
* execute the command here. All we can do is to flag the client | |
* as one that needs to process the command. */ | |
if (c->flags & CLIENT_PENDING_READ) { | |
c->flags |= CLIENT_PENDING_COMMAND; | |
break; | |
} | |
/* We are finally ready to execute the command. */ | |
if (processCommandAndResetClient(c) == C_ERR) { | |
/* If the client is no longer valid, we avoid exiting this | |
* loop and trimming the client buffer later. So we return | |
* ASAP in that case. */ | |
return; | |
} | |
} | |
} | |
//。。。 | |
} | |
int processCommandAndResetClient(client *c) { | |
int deadclient = 0; | |
client *old_client = server.current_client; | |
server.current_client = c; | |
if (processCommand(c) == C_OK) { | |
commandProcessed(c); | |
} | |
//.. | |
} | |
int processCommand(client *c) { | |
//... | |
/** | |
* lookupCommand 查询对应的命令 | |
**/ | |
/* Now lookup the command and check ASAP about trivial error conditions | |
* such as wrong arity, bad command name and so forth. */ | |
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); | |
//.. | |
/* Exec the command */ | |
if (c->flags & CLIENT_MULTI && | |
c->cmd->proc != execCommand && c->cmd->proc != discardCommand && | |
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && | |
c->cmd->proc != resetCommand) | |
{ | |
queueMultiCommand(c); | |
addReply(c,shared.queued); | |
} | |
} | |
struct redisCommand *lookupCommand(sds name) { | |
return dictFetchValue(server.commands, name); | |
} |
- 读取fd内容,并解析对应的命令 set ,查询对应的命令实现:
void *dictFetchValue(dict *d, const void *key) { | |
dictEntry *he; | |
he = dictFind(d,key); | |
return he ? dictGetVal(he) : NULL; | |
} | |
dictEntry *dictFind(dict *d, const void *key) | |
{ | |
dictEntry *he; | |
uint64_t h, idx, table; | |
if (dictSize(d) == 0) return NULL; /* dict is empty */ | |
if (dictIsRehashing(d)) _dictRehashStep(d); | |
h = dictHashKey(d, key); | |
for (table = 0; table <= 1; table++) { | |
idx = h & d->ht[table].sizemask; | |
he = d->ht[table].table[idx]; | |
while(he) { | |
if (key==he->key || dictCompareKeys(d, key, he->key)) | |
return he; | |
he = he->next; | |
} | |
if (!dictIsRehashing(d)) return NULL; | |
} | |
return NULL; | |
} | |
void populateCommandTable(void) { | |
int j; | |
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); | |
for (j = 0; j < numcommands; j++) { | |
struct redisCommand *c = redisCommandTable+j; | |
int retval1, retval2; | |
/* Translate the command string flags description into an actual | |
* set of flags. */ | |
if (populateCommandTableParseFlags(c,c->sflags) == C_ERR) | |
serverPanic("Unsupported command flag"); | |
c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */ | |
retval1 = dictAdd(server.commands, sdsnew(c->name), c); | |
/* Populate an additional dictionary that will be unaffected | |
* by rename-command statements in redis.conf. */ | |
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); | |
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); | |
} | |
} | |
struct redisCommand redisCommandTable[] = { | |
... | |
/* Note that we can't flag set as fast, since it may perform an | |
* implicit DEL of a large key. */ | |
{"set",setCommand,-3, | |
"write use-memory @string", | |
0,NULL,1,1,1,0,0,0}, | |
{"setnx",setnxCommand,3, | |
"write use-memory fast @string", | |
0,NULL,1,1,1,0,0,0}, | |
{"setex",setexCommand,4, | |
"write use-memory @string", | |
0,NULL,1,1,1,0,0,0}, | |
... | |
}; |
- 选择对应的 set命令类,执行set命令
void setCommand(client *c) { | |
robj *expire = NULL; | |
int unit = UNIT_SECONDS; | |
int flags = OBJ_NO_FLAGS; | |
if (parseExtendedStringArgumentsOrReply(c,&flags,&unit,&expire,COMMAND_SET) != C_OK) { | |
return; | |
} | |
c->argv[2] = tryObjectEncoding(c->argv[2]); | |
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); | |
} |
生成响应
执行完命令后,实现函数会生成一个响应对象,并将其添加到客户端的输出缓冲区中。这个过程通常由 addReply 系列函数完成。 对于 SET 命令,实现函数可能会生成一个 “OK” 响应并添加到输出缓冲区中。
void addReply(client *c, robj *obj) { | |
if (prepareClientToWrite(c) != C_OK) return; | |
if (sdsEncodedObject(obj)) { | |
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) | |
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); | |
} else if (obj->encoding == OBJ_ENCODING_INT) { | |
/* For integer encoded strings we just convert it into a string | |
* using our optimized function, and attach the resulting string | |
* to the output buffer. */ | |
char buf[32]; | |
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); | |
if (_addReplyToBuffer(c,buf,len) != C_OK) | |
_addReplyProtoToList(c,buf,len); | |
} else { | |
serverPanic("Wrong obj->encoding in addReply()"); | |
} | |
} |
发送响应
当事件循环检测到输出缓冲区中有数据可以发送时,它会调用 writeToClient 函数将响应发送给客户端。
通过以上步骤,Redis 能够根据客户端发送的命令找到相应的实现函数并执行它,然后将结果发送回客户端。这个过程涉及到多个源码文件和函数,但主要逻辑在 commands.c 文件中完成。
void beforeSleep(struct aeEventLoop *eventLoop) { | |
//... | |
/* Handle writes with pending output buffers. */ | |
handleClientsWithPendingWritesUsingThreads(); | |
//... | |
} | |
int handleClientsWithPendingWritesUsingThreads(void) { | |
int processed = listLength(server.clients_pending_write); | |
if (processed == 0) return 0; /* Return ASAP if there are no clients. */ | |
/* If I/O threads are disabled or we have few clients to serve, don't | |
* use I/O threads, but the boring synchronous code. */ | |
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { | |
return handleClientsWithPendingWrites(); | |
} | |
/* Start threads if needed. */ | |
if (!server.io_threads_active) startThreadedIO(); | |
/* Distribute the clients across N different lists. */ | |
listIter li; | |
listNode *ln; | |
listRewind(server.clients_pending_write,&li); | |
int item_id = 0; | |
while((ln = listNext(&li))) { | |
client *c = listNodeValue(ln); | |
c->flags &= ~CLIENT_PENDING_WRITE; | |
/* Remove clients from the list of pending writes since | |
* they are going to be closed ASAP. */ | |
if (c->flags & CLIENT_CLOSE_ASAP) { | |
listDelNode(server.clients_pending_write, ln); | |
continue; | |
} | |
int target_id = item_id % server.io_threads_num; | |
listAddNodeTail(io_threads_list[target_id],c); | |
item_id++; | |
} | |
/* Give the start condition to the waiting threads, by setting the | |
* start condition atomic var. */ | |
io_threads_op = IO_THREADS_OP_WRITE; | |
for (int j = 1; j < server.io_threads_num; j++) { | |
int count = listLength(io_threads_list[j]); | |
setIOPendingCount(j, count); | |
} | |
/* Also use the main thread to process a slice of clients. */ | |
listRewind(io_threads_list[0],&li); | |
while((ln = listNext(&li))) { | |
client *c = listNodeValue(ln); | |
writeToClient(c,0); | |
} | |
listEmpty(io_threads_list[0]); | |
/* Wait for all the other threads to end their work. */ | |
while(1) { | |
unsigned long pending = 0; | |
for (int j = 1; j < server.io_threads_num; j++) | |
pending += getIOPendingCount(j); | |
if (pending == 0) break; | |
} | |
/* Run the list of clients again to install the write handler where | |
* needed. */ | |
listRewind(server.clients_pending_write,&li); | |
while((ln = listNext(&li))) { | |
client *c = listNodeValue(ln); | |
/* Install the write handler if there are pending writes in some | |
* of the clients. */ | |
if (clientHasPendingReplies(c) && | |
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) | |
{ | |
freeClientAsync(c); | |
} | |
} | |
listEmpty(server.clients_pending_write); | |
/* Update processed count on server */ | |
server.stat_io_writes_processed += processed; | |
return processed; | |
} | |
int handleClientsWithPendingWrites(void) { | |
listIter li; | |
listNode *ln; | |
int processed = listLength(server.clients_pending_write); | |
listRewind(server.clients_pending_write,&li); | |
while((ln = listNext(&li))) { | |
client *c = listNodeValue(ln); | |
c->flags &= ~CLIENT_PENDING_WRITE; | |
listDelNode(server.clients_pending_write,ln); | |
/* If a client is protected, don't do anything, | |
* that may trigger write error or recreate handler. */ | |
if (c->flags & CLIENT_PROTECTED) continue; | |
/* Don't write to clients that are going to be closed anyway. */ | |
if (c->flags & CLIENT_CLOSE_ASAP) continue; | |
/* Try to write buffers to the client socket. */ | |
if (writeToClient(c,0) == C_ERR) continue; | |
/* If after the synchronous writes above we still have data to | |
* output to the client, we need to install the writable handler. */ | |
if (clientHasPendingReplies(c)) { | |
int ae_barrier = 0; | |
/* For the fsync=always policy, we want that a given FD is never | |
* served for reading and writing in the same event loop iteration, | |
* so that in the middle of receiving the query, and serving it | |
* to the client, we'll call beforeSleep() that will do the | |
* actual fsync of AOF to disk. the write barrier ensures that. */ | |
if (server.aof_state == AOF_ON && | |
server.aof_fsync == AOF_FSYNC_ALWAYS) | |
{ | |
ae_barrier = 1; | |
} | |
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { | |
freeClientAsync(c); | |
} | |
} | |
} | |
return processed; | |
} |