文件入口:server.c##main
配置初始化
这一步表示Redis服务器基本数据结构和各种参数的初始化。在Redis源码中,Redis服务器是用一个叫做redisServer的struct来表达的,里面定义了Redis服务器赖以运行的各种参数,比如监听的端口号和文件描述符、当前连接的各个client端、Redis命令表(command table)配置、持久化相关的各种参数,以及事件循环结构。
void initServerConfig(void) { | |
int j; | |
... | |
/* Command table -- we initiialize it here as it is part of the | |
* initial configuration, since command names may be changed via | |
* redis.conf using the rename-command directive. */ | |
server.commands = dictCreate(&commandTableDictType,NULL); | |
server.orig_commands = dictCreate(&commandTableDictType,NULL); | |
populateCommandTable(); | |
server.delCommand = lookupCommandByCString("del"); | |
server.multiCommand = lookupCommandByCString("multi"); | |
server.lpushCommand = lookupCommandByCString("lpush"); | |
server.lpopCommand = lookupCommandByCString("lpop"); | |
server.rpopCommand = lookupCommandByCString("rpop"); | |
server.zpopminCommand = lookupCommandByCString("zpopmin"); | |
server.zpopmaxCommand = lookupCommandByCString("zpopmax"); | |
server.sremCommand = lookupCommandByCString("srem"); | |
server.execCommand = lookupCommandByCString("exec"); | |
server.expireCommand = lookupCommandByCString("expire"); | |
server.pexpireCommand = lookupCommandByCString("pexpire"); | |
server.xclaimCommand = lookupCommandByCString("xclaim"); | |
server.xgroupCommand = lookupCommandByCString("xgroup"); | |
server.rpoplpushCommand = lookupCommandByCString("rpoplpush"); | |
/* Debugging */ | |
server.assert_failed = "<no assertion failed>"; | |
server.assert_file = "<no file>"; | |
server.assert_line = 0; | |
server.bug_report_start = 0; | |
server.watchdog_period = 0; | |
/* By default we want scripts to be always replicated by effects | |
* (single commands executed by the script), and not by sending the | |
* script to the slave / AOF. This is the new way starting from | |
* Redis 5. However it is possible to revert it via redis.conf. */ | |
server.lua_always_replicate_commands = 1; | |
initConfigValues(); | |
} |
Redis服务器在运行时就是由这个redisServer类型的全局变量来表示的(变量名就叫server),这一步的初始化主要就是对于这个全局变量进行初始化。
在整个初始化过程中,有一个需要特别关注的函数:populateCommandTable。它初始化了Redis命令表,通过它可以由任意一个Redis命令的名字查找该命令的配置信息(比如该命令接收的命令参数个数、执行函数入口等)。
void populateCommandTable(void) { | |
int j; | |
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); | |
for (j = 0; j < numcommands; j++) { | |
struct redisCommand *c = redisCommandTable+j; | |
int retval1, retval2; | |
/* Translate the command string flags description into an actual | |
* set of flags. */ | |
if (populateCommandTableParseFlags(c,c->sflags) == C_ERR) | |
serverPanic("Unsupported command flag"); | |
c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */ | |
retval1 = dictAdd(server.commands, sdsnew(c->name), c); | |
/* Populate an additional dictionary that will be unaffected | |
* by rename-command statements in redis.conf. */ | |
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); | |
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); | |
} | |
} |
redis的命令是硬编码的,我们可以进入redisCommandTable
看到如下:
struct redisCommand redisCommandTable[] = { | |
{"module",moduleCommand,-2, | |
"admin no-script", | |
0,NULL,0,0,0,0,0,0}, | |
{"get",getCommand,2, | |
"read-only fast @string", | |
0,NULL,1,1,1,0,0,0}, | |
/* Note that we can't flag set as fast, since it may perform an | |
* implicit DEL of a large key. */ | |
{"set",setCommand,-3, | |
"write use-memory @string", | |
0,NULL,1,1,1,0,0,0}, | |
... | |
} |
表里包含了命令以及命令对应的函数,其中,每个命令的结构如下:
struct redisCommand { | |
char *name; | |
redisCommandProc *proc; | |
int arity; | |
char *sflags; /* Flags as string representation, one char per flag. */ | |
uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */ | |
/* Use a function to determine keys arguments in a command line. | |
* Used for Redis Cluster redirect. */ | |
redisGetKeysProc *getkeys_proc; | |
/* What keys should be loaded in background when calling this command? */ | |
int firstkey; /* The first argument that's a key (0 = no keys) */ | |
int lastkey; /* The last argument that's a key */ | |
int keystep; /* The step between first and last key */ | |
long long microseconds, calls; | |
int id; /* Command ID. This is a progressive ID starting from 0 that | |
is assigned at runtime, and is used in order to check | |
ACLs. A connection is able to execute a given command if | |
the user associated to the connection has this command | |
bit set in the bitmap of allowed commands. */ | |
}; |
读取配置文件
回到初始化server结构体代码中,我们可以看到:在对全局的redisServer结构进行了初始化之后,还需要从配置文件(redis.conf)中加载配置。这个过程可能覆盖掉之前初始化过的redisServer结构中的某些参数。换句话说,就是先经过一轮初始化,保证Redis的各个内部数据结构以及参数都有缺省值,然后再从配置文件中加载自定义的配置。
void initConfigValues() { | |
for (standardConfig *config = configs; config->name != NULL; config++) { | |
config->interface.init(config->data); | |
} | |
} |
创建事件循环
在Redis中,事件循环是用一个叫aeEventLoop的struct来表示的。「创建事件循环」这一步主要就是创建一个aeEventLoop结构,并存储到server全局变量(即前面提到的redisServer类型的结构)中。另外,事件循环的执行依赖系统底层的I/O多路复用机制(I/O multiplexing),比如Linux系统上的epoll机制。因此,这一步也包含对于底层I/O多路复用机制的初始化(调用系统API)。
aeEventLoop *aeCreateEventLoop(int setsize) { | |
aeEventLoop *eventLoop; | |
int i; | |
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; | |
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); | |
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); | |
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; | |
eventLoop->setsize = setsize; | |
eventLoop->lastTime = time(NULL); | |
eventLoop->timeEventHead = NULL; | |
eventLoop->timeEventNextId = 0; | |
eventLoop->stop = 0; | |
eventLoop->maxfd = -1; | |
eventLoop->beforesleep = NULL; | |
eventLoop->aftersleep = NULL; | |
eventLoop->flags = 0; | |
if (aeApiCreate(eventLoop) == -1) goto err; | |
/* Events with mask == AE_NONE are not set. So let's initialize the | |
* vector with it. */ | |
for (i = 0; i < setsize; i++) | |
eventLoop->events[i].mask = AE_NONE; | |
return eventLoop; | |
err: | |
if (eventLoop) { | |
zfree(eventLoop->events); | |
zfree(eventLoop->fired); | |
zfree(eventLoop); | |
} | |
return NULL; | |
} |
开启socket监听
服务器程序需要监听才能收到请求。根据配置,这一步可能会打开两种监听:对于TCP连接的监听和对于Unix domain socket的监听。Unix domain socket是一种高效的进程间通信(IPC)机制,在POSIX规范中也有明确的定义,用于在同一台主机上的两个不同进程之间进行通信,比使用TCP协议性能更高(因为省去了协议栈的开销)。当使用Redis客户端连接同一台机器上的Redis服务器时,可以选择使用「Unix domain socket」进行连接。但不管是哪一种监听,程序都会获得文件描述符,并存储到server全局变量中。对于TCP的监听来说,由于监听的IP地址和端口可以绑定多个,因此获得的用于监听TCP连接的文件描述符也可以包含多个。后面,程序就可以拿这一步获得的文件描述符去注册I/O事件回调了。
int listenToPort(int port, int *fds, int *count) { | |
int j; | |
/* Force binding of 0.0.0.0 if no bind address is specified, always | |
* entering the loop if j == 0. */ | |
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; | |
for (j = 0; j < server.bindaddr_count || j == 0; j++) { | |
if (server.bindaddr[j] == NULL) { | |
int unsupported = 0; | |
/* Bind * for both IPv6 and IPv4, we enter here only if | |
* server.bindaddr_count == 0. */ | |
fds[*count] = anetTcp6Server(server.neterr,port,NULL, | |
server.tcp_backlog); | |
if (fds[*count] != ANET_ERR) { | |
anetNonBlock(NULL,fds[*count]); | |
(*count)++; | |
} else if (errno == EAFNOSUPPORT) { | |
unsupported++; | |
serverLog(LL_WARNING,"Not listening to IPv6: unsupported"); | |
} | |
if (*count == 1 || unsupported) { | |
/* Bind the IPv4 address as well. */ | |
fds[*count] = anetTcpServer(server.neterr,port,NULL, | |
server.tcp_backlog); | |
if (fds[*count] != ANET_ERR) { | |
anetNonBlock(NULL,fds[*count]); | |
(*count)++; | |
} else if (errno == EAFNOSUPPORT) { | |
unsupported++; | |
serverLog(LL_WARNING,"Not listening to IPv4: unsupported"); | |
} | |
} | |
/* Exit the loop if we were able to bind * on IPv4 and IPv6, | |
* otherwise fds[*count] will be ANET_ERR and we'll print an | |
* error and return to the caller with an error. */ | |
if (*count + unsupported == 2) break; | |
} else if (strchr(server.bindaddr[j],':')) { | |
/* Bind IPv6 address. */ | |
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], | |
server.tcp_backlog); | |
} else { | |
/* Bind IPv4 address. */ | |
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], | |
server.tcp_backlog); | |
} | |
if (fds[*count] == ANET_ERR) { | |
serverLog(LL_WARNING, | |
"Could not create server TCP listening socket %s:%d: %s", | |
server.bindaddr[j] ? server.bindaddr[j] : "*", | |
port, server.neterr); | |
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || | |
errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || | |
errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL) | |
continue; | |
return C_ERR; | |
} | |
anetNonBlock(NULL,fds[*count]); | |
(*count)++; | |
} | |
return C_OK; | |
} |
注册timer事件回调
Redis作为一个单线程(single-threaded)的程序,它如果想调度一些异步执行的任务,比如周期性地执行过期key的回收动作,除了依赖事件循环机制,没有其它的办法。这一步就是向前面刚刚创建好的事件循环中注册一个timer事件,并配置成可以周期性地执行一个回调函数:serverCron。由于Redis只有一个主线程,因此这个函数周期性的执行也是在这个线程内,它由事件循环来驱动(即在合适的时机调用),但不影响同一个线程上其它逻辑的执行(相当于按时间分片了)。serverCron函数到底做了什么呢?实际上,它除了周期性地执行过期key的回收动作,还执行了很多其它任务,比如主从重连、Cluster节点间的重连、BGSAVE和AOF rewrite的触发执行,等等
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, | |
aeTimeProc *proc, void *clientData, | |
aeEventFinalizerProc *finalizerProc) | |
{ | |
long long id = eventLoop->timeEventNextId++; | |
aeTimeEvent *te; | |
te = zmalloc(sizeof(*te)); | |
if (te == NULL) return AE_ERR; | |
te->id = id; | |
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); | |
te->timeProc = proc; | |
te->finalizerProc = finalizerProc; | |
te->clientData = clientData; | |
te->prev = NULL; | |
te->next = eventLoop->timeEventHead; | |
if (te->next) | |
te->next->prev = te; | |
eventLoop->timeEventHead = te; | |
return id; | |
} |
注册I/O事件回调
Redis服务端最主要的工作就是监听I/O事件,从中分析出来自客户端的命令请求,执行命令,然后返回响应结果。对于I/O事件的监听,自然也是依赖事件循环。前面提到过,Redis可以打开两种监听:对于TCP连接的监听和对于Unix domain socket的监听。因此,这里就包含对于这两种I/O事件的回调的注册,两个回调函数分别是acceptTcpHandler和acceptUnixHandler。对于来自Redis客户端的请求的处理,就会走到这两个函数中去。另外,其实Redis在这里还会注册一个I/O事件,用于通过管道(pipe)机制与module进行双向通信。
/* Create an event handler for accepting new connections in TCP and Unix | |
* domain sockets. */ | |
for (j = 0; j < server.ipfd_count; j++) { | |
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, | |
acceptTcpHandler,NULL) == AE_ERR) | |
{ | |
serverPanic( | |
"Unrecoverable error creating server.ipfd file event."); | |
} | |
} | |
for (j = 0; j < server.tlsfd_count; j++) { | |
if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, | |
acceptTLSHandler,NULL) == AE_ERR) | |
{ | |
serverPanic( | |
"Unrecoverable error creating server.tlsfd file event."); | |
} | |
} | |
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, | |
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); | |
/* Register a readable event for the pipe used to awake the event loop | |
* when a blocked client in a module needs attention. */ | |
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, | |
moduleBlockedClientPipeReadable,NULL) == AE_ERR) { | |
serverPanic( | |
"Error registering the readable event for the module " | |
"blocked clients subsystem."); | |
} |
接下来就是InitServerLast方法:
void InitServerLast() { | |
bioInit(); | |
initThreadedIO(); | |
set_jemalloc_bg_thread(server.jemalloc_bg_thread); | |
server.initial_memory_usage = zmalloc_used_memory(); | |
} |
初始化后台线程
Redis会创建一些额外的线程,在后台运行,专门用于处理一些耗时的并且可以被延迟执行的任务(一般是一些清理工作)。在Redis里面这些后台线程被称为bio(Background I/O service)。它们负责的任务包括:可以延迟执行的文件关闭操作(比如unlink命令的执行),AOF的持久化写库操作(即fsync调用,但注意只有可以被延迟执行的fsync操作才在后台线程执行),还有一些大key的清除操作(比如flushdb async命令的执行)。可见bio这个名字有点名不副实,它做的事情不一定跟I/O有关。对于这些后台线程,我们可能还会产生一个疑问:前面的初始化过程,已经注册了一个timer事件回调,即serverCron函数,按说后台线程执行的这些任务似乎也可以放在serverCron中去执行。因为serverCron函数也是可以用来执行后台任务的。实际上这样做是不行的。前面我们已经提到过,serverCron由事件循环来驱动,执行还是在Redis主线程上,相当于和主线程上执行的其它操作(主要是对于命令请求的执行)按时间进行分片了。这样的话,serverCron里面就不能执行过于耗时的操作,否则它就会影响Redis执行命令的响应时间。因此,对于耗时的、并且可以被延迟执行的任务,就只能放到单独的线程中去执行了。
void bioInit(void) { | |
pthread_attr_t attr; | |
pthread_t thread; | |
size_t stacksize; | |
int j; | |
/* Initialization of state vars and objects */ | |
for (j = 0; j < BIO_NUM_OPS; j++) { | |
pthread_mutex_init(&bio_mutex[j],NULL); | |
pthread_cond_init(&bio_newjob_cond[j],NULL); | |
pthread_cond_init(&bio_step_cond[j],NULL); | |
bio_jobs[j] = listCreate(); | |
bio_pending[j] = 0; | |
} | |
/* Set the stack size as by default it may be small in some system */ | |
pthread_attr_init(&attr); | |
pthread_attr_getstacksize(&attr,&stacksize); | |
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ | |
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; | |
pthread_attr_setstacksize(&attr, stacksize); | |
/* Ready to spawn our threads. We use the single argument the thread | |
* function accepts in order to pass the job ID the thread is | |
* responsible of. */ | |
for (j = 0; j < BIO_NUM_OPS; j++) { | |
void *arg = (void*)(unsigned long) j; | |
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { | |
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); | |
exit(1); | |
} | |
bio_threads[j] = thread; | |
} | |
} |
初始化主线程IO
前面创建好了事件循环的结构,但还没有真正进入循环的逻辑。过了这一步,事件循环就运行起来,驱动前面注册的timer事件回调和I/O事件回调不断执行。
void initThreadedIO(void) { | |
io_threads_active = 0; /* We start with threads not active. */ | |
/* Don't spawn any thread if the user selected a single thread: | |
* we'll handle I/O directly from the main thread. */ | |
if (server.io_threads_num == 1) return; | |
if (server.io_threads_num > IO_THREADS_MAX_NUM) { | |
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " | |
"The maximum number is %d.", IO_THREADS_MAX_NUM); | |
exit(1); | |
} | |
/* Spawn and initialize the I/O threads. */ | |
for (int i = 0; i < server.io_threads_num; i++) { | |
/* Things we do for all the threads including the main thread. */ | |
io_threads_list[i] = listCreate(); | |
if (i == 0) continue; /* Thread 0 is the main thread. */ | |
/* Things we do only for the additional threads. */ | |
pthread_t tid; | |
pthread_mutex_init(&io_threads_mutex[i],NULL); | |
io_threads_pending[i] = 0; | |
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ | |
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { | |
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); | |
exit(1); | |
} | |
io_threads[i] = tid; | |
} | |
} |
还原数据库
初始化完服务器的状态后,服务器已经处于一个可启动状态,因为redis有持久化特性,服务器还需要加载相应的文件来还原之前数据库的数据。判断Redis当前开启了哪种模式,如果是AOF,则通过AOF还原数据库的数据,否则,载入RDB文件,通过RDB文件还原数据库的数据。
void loadDataFromDisk(void) { | |
long long start = ustime(); | |
if (server.aof_state == AOF_ON) { | |
if (loadAppendOnlyFile(server.aof_filename) == C_OK) | |
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); | |
} else { | |
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; | |
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { | |
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", | |
(float)(ustime()-start)/1000000); | |
/* Restore the replication ID / offset from the RDB file. */ | |
if ((server.masterhost || | |
(server.cluster_enabled && | |
nodeIsSlave(server.cluster->myself))) && | |
rsi.repl_id_is_set && | |
rsi.repl_offset != -1 && | |
/* Note that older implementations may save a repl_stream_db | |
* of -1 inside the RDB file in a wrong way, see more | |
* information in function rdbPopulateSaveInfo. */ | |
rsi.repl_stream_db != -1) | |
{ | |
memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); | |
server.master_repl_offset = rsi.repl_offset; | |
/* If we are a slave, create a cached master from this | |
* information, in order to allow partial resynchronizations | |
* with masters. */ | |
replicationCacheMasterUsingMyself(); | |
selectDb(server.cached_master,rsi.repl_stream_db); | |
} | |
} else if (errno != ENOENT) { | |
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); | |
exit(1); | |
} | |
} | |
} |
启动事件监听
main函数会设置beforeSleep和afterSleep回调函数,然后调用aeMain函数启动事件循环器,开始监听事件。aeMain函数是一个死循环,不断的监听新请求的到来。
aeSetBeforeSleepProc(server.el,beforeSleep); | |
aeSetAfterSleepProc(server.el,afterSleep); | |
aeMain(server.el); | |
aeDeleteEventLoop(server.el); |