开启redis后再启用aof会不会引起redis重启后数据丢失失

&&&&&&&&&&&&&
上一篇对RDB的源码分析是比较多的,但是AOF持久化执行进行了一些理论上的分析和概念的说明。本来想自己偷一些懒,将上篇文章中最后所给链接的AOF实现代码随便过一过算了,后来也就是在过的过程中发现自己这也看不懂那也看不懂才知道AOF的重要性和难度。
后来又花了不少时间查阅资料、结合源代码分析,对AOF的大概执行过程有了更深一些的了解,现在就将自己的理解和大家进行分享。其中肯定有理解不正确的地方,还望大神们能给予指正。
AOF相关配置项
首先我们看一下redis.conf里的关于AOF的配置选项:Appendonly(yes,no)&&是否开启AOF持久化Appendfilename(log/appendonly.aof)&&AOF日志文件Appendfsync(always,everysec,no)&&AOF日志文件同步的频率,always代表每次写都进行fsync,everysec每秒钟一次,no不主动fsync,由OS自己来完成。no-appendfsync-on-rewrite(yes,no)&&进行rewrite时,是否需要fsyncauto-aof-rewrite-percentage(100)&&当AOF文件增长了这个比例(这里是增加了一倍),则后台rewrite自动运行auto-aof-rewrite-min-size(64mb)&&进行后面rewrite要求的最小AOF文件大小。这两个选项共同决定了后面rewrite进程是否到达运行的时机
通过上面的选项我们可以知道redis有三个AOF处理流程:
每次更新操作进行的AOF写操作(涉及同步频率);
Rewrite,当满足auto-aof-rewrite-percentage,auto-aof-rewrite-min-size时后面自动运行rewrite操作;
Rewrite,当收到bgrewriteaof客户端命令时,马上运行后面rewrite操作。
注:当某个key过期的时候也会写AOF,其实它跟第一种很类似,也就是DEL操作。
在redis的较新版本中(不知道从哪个版本开始)增加了两个新的子进程:
REDIS_BIO_CLOSE_FILE,负责所有的close file操作
REDIS_BIO_AOF_FSYNC,负责fsync操作
因为这两个操作都可能会引起阻塞,如果在主线程中完成的话,会影响系统对事件的响应,所以这里统一由相应的子线程来完成,每个子线程都有一个自己的bio_jobs list,用来保存需要的处理的job任务。其相应的代码在bio.c(线程处理函数为bioProcessBackgroundJobs)里,这两个线程在initServer时创建bioInit()。
void initServer() {
// 初始化 BIO 系统
bioInit();
AOF的处理流程
  1.每次更新操作进行的AOF写操作(涉及同步频率)
主要涉及的配置是:Appendfsync(AOF日志文件同步的频率),no-appendfsync-on-rewrite(进行rewrite时,是否需要fsync),该操作的入口在redis.c。
void call(redisClient *c, int flags) {
// 保留旧 dirty 计数器值
dirty = server.
// 计算命令开始执行的时间
start = ustime();
// 执行实现函数
c-&cmd-&proc(c);
// 计算命令执行耗费的时间
duration = ustime()-
// 计算命令执行之后的 dirty 值
dirty = server.dirty-
/* Propagate the command into the AOF and replication link */
// 将命令复制到 AOF 和 slave 节点
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
// 强制 REPL 传播
if (c-&flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
// 强制 AOF 传播
if (c-&flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
// 如果数据库有被修改,那么启用 REPL 和 AOF 传播
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c-&cmd,c-&db-&id,c-&argv,c-&argc,flags);
我们再来看一下propagate的实现:
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
// 传播到 AOF
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 传播到 slave
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
我们再来看一下feedAppendOnlyFile的实现:
void feedAppendOnlyFile(struct redisCommand&{
if (dictid != server.aof_selected_db) {//当前操作的db与上一次不一样,所以要重新写一个新的select db命令,当rewrite的时候也会把appendseldb置为-1
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db =
buf = catAppendOnlyGenericCommand(buf,argc,argv); //转换为标准命令格式
server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); //将命令写到aofbuf,这个buf会在serverCron当Appendfsync到满足时fsync到文件
if (server.bgrewritechildpid != -1) //如果有bgrewrite子进程的话,则也必须把该命令保存到bgrewritebuf,以便在子进程结束时,把新的变更追加到rewrite后的文件
server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
可以看到到上面AOF操作也只是写到buf中,并没有将其写到文件中,下面我们将查看写到文件中的过程。通过查看代码我们可以知道flushAppendOnlyFile()函数是进行真正的写入文件操作。另外我们可以知道该函数会在beforeSleep及serverCron中调用。其中beforeSleep是aeMain循环,每次进行事件处理前必须调用一次:
void aeMain(aeEventLoop *eventLoop) {
eventLoop-&stop = 0;
while (!eventLoop-&stop) {
if (eventLoop-&beforesleep != NULL)
eventLoop-&beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
// 每次处理事件之前执行
void beforeSleep(struct aeEventLoop *eventLoop) {
/* Write the AOF buffer on disk */
// 将 AOF 缓冲区的内容写入到 AOF 文件
flushAppendOnlyFile(0);
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// 根据 AOF 政策,
// 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
下面我们来看一下该函数flushAppendOnlyFile的实现
/* Write the append only file buffer on disk.
* 将 AOF 缓存写入到文件中。
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
* 因为程序需要在回复客户端之前对 AOF 执行写操作。
* 而客户端能执行写操作的唯一机会就是在事件 loop 中,
* 因此,程序将所有 AOF 写累积到缓存中,
* 并在重新进入事件 loop 之前,将缓存写入到文件中。
* About the 'force' argument:
* 关于 force 参数:
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* 当 fsync 策略为每秒钟保存一次时,如果后台线程仍然有 fsync 在执行,
* 那么我们可能会延迟执行冲洗(flush)操作,
* 因为 Linux 上的 write(2) 会被后台的 fsync 阻塞。
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
* 当这种情况发生时,说明需要尽快冲洗 aof 缓存,
* 程序会尝试在 serverCron() 函数中对缓存进行冲洗。
* However if force is set to 1 we'll write regardless of the background
* 不过,如果 force 为 1 的话,那么不管后台是否正在 fsync ,
* 程序都直接进行写入。
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
int sync_in_progress = 0;
// 缓冲区中没有任何内容,直接返回
if (sdslen(server.aof_buf) == 0) return;
// 策略为每秒 FSYNC
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// 是否有 SYNC 正在后台进行?
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 每秒 fsync ,并且强制写入为假
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds.
* 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
* (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
if (sync_in_progress) {
// 有 fsync 正在后台进行 。。。
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponinig, remember that we are
* postponing the flush and return.
* 前面没有推迟过 write 操作,这里将推迟写操作的起始时间记录下来
* 然后就返回,不执行 write 或者 fsync
server.aof_flush_postponed_start = server.
} else if (server.unixtime - server.aof_flush_postponed_start & 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again.
* 如果之前已经因为 fsync 而推迟了 write 操作
* 但是推迟的时间不超过 2 秒,那么直接返回
* 不执行 write 或者 fsync
/* Otherwise fall trough, and go write since we can't wait
* over two seconds.
* 如果后台还有 fsync 在执行,并且 write 已经推迟 &= 2 秒
* 那么执行写操作(write 将被阻塞)
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero.
* 执行到这里,程序会对 AOF 文件进行写入。
* 清零延迟 write 的时间记录
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike
* 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
* 这时就要用 redis-check-aof 程序来进行修复。
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {//写入文件有错
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
// 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
if ((server.unixtime - last_write_error_log) & AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.
/* Lof the AOF write error and record the error code. */
// 如果写入出错,那么尝试将该情况写入到日志里面
if (nwritten == -1) {
if (can_log) {
redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno =
if (can_log) {
redisLog(REDIS_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
// 尝试移除新追加的不完整内容
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file.
Redis may refuse "
"to load the AOF the next time it starts.
"ftruncate: %s", strerror(errno));
/* If the ftrunacate() succeeded we can set nwritten to
* -1 since there is no longer partial(部分的,局部的) data into the AOF. */
nwritten = -1;
server.aof_last_write_errno = ENOSPC;
/* Handle the AOF write error. */
// 处理写入 AOF 文件时出现的错误
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synched on disk. */
//当fsync是ALWAYS时,那么如果出错我们是不可能进行恢复的,因为尽管出错,我们对用户的回复已经
//到达了输出缓冲区,并且我们还向用户说明(set sadd等操作的)写数据已经写到了磁盘
redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = REDIS_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
//如果这是局部写的话(我靠,我也翻译不好),那就缩减sds buffer(aof_buffer)的大小
if (nwritten & 0) {
server.aof_current_size +=
sdsrange(server.aof_buf,nwritten,-1);
return; /* We'll try again on the next call... */
} else {//写入文件没错
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 写入成功,更新最后写入状态
if (server.aof_last_write_status == REDIS_ERR) {
redisLog(REDIS_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = REDIS_OK;
// 更新写入后的 AOF 文件大小
server.aof_current_size +=
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary).
* 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
* 否则的话,释放 AOF 缓存。
* sdsavail(server.aof_buf)返回 aof_buf 可用空间的长度
* sdslen(server.aof_buf)返回 aof_buf 实际保存的字符串的长度
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) & 4000) {
// 清空缓存中的内容,等待重用
sdsclear(server.aof_buf);
// 释放缓存
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background.
* 如果 no-appendfsync-on-rewrite 选项为开启状态,
* 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
* 那么不执行 fsync
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
/* Perform the fsync if needed. */
// 总是执行 fsnyc
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
// 更新最后一次执行 fsnyc 的时间
server.aof_last_fsync = server.
// 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime & server.aof_last_fsync)) {
// 放到后台执行
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
// 更新最后一次执行 fsync 的时间
server.aof_last_fsync = server.
// 其实上面无论执行 if 部分还是 else 部分都要更新 fsync 的时间
// 可以将代码挪到下面来
// server.aof_last_fsync = server.
&通过上面的介绍我们可以知道即使Appendfsync设置为alway,并不是每次执行完一条更新命令就直接写(write+fsync)aof file,这个过程(write+fsync)会被推迟到事件处理流程结束后beforeSleep后进行(一个疑问先写到server.aofbuf,然后再写到数据文件,过程中如果crash会不会丢数据呢? 答案是:不会,因为在一次事件处理结束之后会调用beforeSleep进行flash,而它也是在下一次事件处理之前完成的,即只有在同步到文件之后才会给客户端回复成功与否);如果在beforeSleep时已经有fsync job在等待fsync线程处理(只有一个aof fd,之前还在想为什么它不能再被放到list里),if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) && if (sync_in_progress),则该次的请求会被标志为server.aof_flush_postponed_start,那么在调用serverCron时会再次调用flushAppendOnlyFile,看是否现在能够进行write并且把该job提交给fsync线程,或者如果已经等待超过2s,则给出一个系统提示。[同样的貌似everysec,也并不是真正的每1s fsync一次]
  2.后面自动运行rewrite
该操作涉及的配置:auto-aof-rewrite-percentage,auto-aof-rewrite-min-size。该过程是在serverCron里判断,是满足到达运行bgrewrite的时机:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData){
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
// 如果 BGSAVE 和 BGREWRITEAOF 都没有在执行
// 并且有一个 BGREWRITEAOF 在等待,那么执行 BGREWRITEAOF
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
rewriteAppendOnlyFileBackground();
/* Check if a background saving or AOF rewrite in progress terminated. */
// 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
// 接收子进程发来的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
// BGREWRITEAOF 执行完毕
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
updateDictResizePolicy();
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now */
// 既然没有 BGSAVE 或者 BGREWRITEAOF 在执行,那么检查是否需要执行它们
// 遍历所有保存条件,看是否需要执行 BGSAVE 命令
for (j = 0; j & server. j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
// 检查是否有某个保存条件已经满足了
if (server.dirty &= sp-&changes &&
server.unixtime-server.lastsave & sp-&seconds &&
(server.unixtime-server.lastbgsave_try &
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp-&changes, (int)sp-&seconds);
// 执行 BGSAVE
rdbSaveBackground(server.rdb_filename);
/* Trigger an AOF rewrite if needed */
// 出发 BGREWRITEAOF
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF 文件的当前大小大于执行 BGREWRITEAOF 所需的最小大小
server.aof_current_size & server.aof_rewrite_min_size)
// 上一次完成 AOF 写入之后,AOF 文件的大小
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// AOF 文件当前的体积相对于 base 的体积的百分比
long long growth = (server.aof_current_size*100/base) - 100;
// 如果增长体积的百分比超过了 growth ,那么执行 BGREWRITEAOF
if (growth &= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
// 执行 BGREWRITEAOF
rewriteAppendOnlyFileBackground();
  3.&客户端发送bgrewriteaof命令&
  通过查找readonlyCommandTable表,我们可以看到当客户端发送bgrewriteaof命令过来的时候,服务器调用bgrewriteaofCommand函数来进行处理。该函数会判断当前是否已经有bgrewritechildpid存在,或者bgsavechildpid存在则标志server.aofrewrite_scheduled = 1,需要进行bgrewrite,但不是现在,而是在serverCron处理的时候。否则则直接调用rewriteAppendOnlyFileBackground,创建bgrewrite进程,进行rewrite操作。
rewriteAppendOnlyFileBackground实现如下:
/* This is how rewriting of the append only file in background works:
* 以下是后台重写 AOF 文件(BGREWRITEAOF)的工作步骤:
* 1) The user calls BGREWRITEAOF
用户调用 BGREWRITEAOF
* 2) Redis calls this function, that forks():
Redis 调用这个函数,它执行 fork() :
2a) the child rewrite the append only file in a temp file.
子进程在临时文件中对 AOF 文件进行重写
2b) the parent accumulates differences in server.aof_rewrite_buf.
父进程将新输入的写命令追加到 server.aof_rewrite_buf 中
* 3) When the child finished '2a' exists.
当步骤 2a 执行完之后,子进程结束
* 4) The parent will trap the exit code, if it's OK, will append the
data accumulated into server.aof_rewrite_buf into the temp file, and
finally will rename(2) the temp file in the actual file name.
The the new file is reopened as the new append only file. Profit!
父进程会捕捉子进程的退出信号,
如果子进程的退出状态是 OK 的话,
那么父进程将新输入命令的缓存追加到临时文件,
然后使用 rename(2) 对临时文件改名,用它代替旧的 AOF 文件,
至此,后台 AOF 重写完成。
int rewriteAppendOnlyFileBackground(void) {
// 已经有子进程在进行 AOF 重写了
if (server.aof_child_pid != -1) return REDIS_ERR;
// 记录 fork 开始前的时间,计算 fork 耗时用
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
/* Child */
// 关闭监听(在我看来子进程完全复制了父进程的资源后也会有监听,所以需要关闭子进程监听的东西)
closeListeningSockets(0);
// 为进程设置名字,方便记认
redisSetProcTitle("redis-aof-rewrite");
// 创建临时文件,并进行 AOF 重写
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
//脏数据,其实就是子进程消耗的内存大小
//获取脏数据大小
size_t private_dirty = zmalloc_get_private_dirty();
//记录脏数据
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
// 发送重写成功信号
exitFromChild(0);
// 发送重写失败信号
exitFromChild(1);
/* Parent */
// 记录执行 fork 所消耗的时间
server.stat_fork_time = ustime()-
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
// 记录 AOF 重写的信息
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid =
//更新rehash的(条件),可以查看该函数的具体函数说明(这里是为了关闭rehash)
updateDictResizePolicy();
/* We set append_sel_db to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge.
* 将 aof_selected_db 设为 -1 ,
* 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令,
* 从而确保之后新添加的命令会设置到正确的数据库中
server.aof_selected_db = -1;
//清空脚本缓存
replicationScriptCacheFlush();
return REDIS_OK;
return REDIS_OK; /* unreached */
接下来我们看一下子进程是如何完成该工作的:
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
* 将一集足以还原当前数据集的命令写入到 filename 指定的文件中。
* 这个函数被 REWRITEAOF 和 BGREWRITEAOF 两个命令调用。
* (REWRITEAOF 似乎已经是一个废弃的命令)
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command.
* 为了最小化重建数据集所需执行的命令数量,
* Redis 会尽可能地使用接受可变参数数量的命令,比如 RPUSH 、SADD 和 ZADD 等。
* 不过单个命令每次处理的元素数量不能超过 REDIS_AOF_REWRITE_ITEMS_PER_CMD 。
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *
char tmpfile[256];
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function.
* 创建临时文件
* 注意这里创建的文件名和 rewriteAppendOnlyFileBackground() 创建的文件名稍有不同
* 一个是temp-rewriteaof-bg-%d.aof
* 另一个是temp-rewriteaof-%d.aof
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
// 初始化文件 io
rioInitWithFile(&aof,fp);
// 设置每写入 REDIS_AOF_AUTOSYNC_BYTES 字节
// 就执行一次 FSYNC(fsync函数同步内存中所有已修改的文件数据到储存设备。参数fd是该进程打开来的文件描述符。 函数成功执行时,返回0。失败返回-1)
// 防止缓存中积累太多命令内容,造成 I/O 阻塞时间过长
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
// 遍历所有数据库
for (j = 0; j & server. j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
// 指向键空间
dict *d = db-&
if (dictSize(d) == 0) continue;
// 创建键空间迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
/* SELECT the new DB
* 首先写入 SELECT 命令,确保之后的数据会被插入到正确的数据库上
* (这一点可以自行打开appendonly.aof查看相应的select语句的保存)
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto
if (rioWriteBulkLongLong(&aof,j) == 0) goto
/* Iterate this DB writing every entry
* 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中
while((de = dictNext(di)) != NULL) {
robj key, *o;
keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);
// 取出过期时间
expiretime = getExpire(db,&key);
/* If this key is already expired skip it
* 如果键已经过期,那么跳过它,不保存
if (expiretime != -1 && expiretime & now) continue;
/* Save the key and associated value
* 根据值的类型,选择适当的命令来保存值
if (o-&type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto
if (rioWriteBulkObject(&aof,o) == 0) goto
} else if (o-&type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto
} else if (o-&type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto
} else if (o-&type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto
} else if (o-&type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto
redisPanic("Unknown object type");
/* Save the expire time
* 保存键的过期时间
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
// 写入 PEXPIREAT expiretime 命令
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto
if (rioWriteBulkObject(&aof,&key) == 0) goto
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto
// 释放迭代器
dictReleaseIterator(di);
/* Make sure data will not remain on the OS's output buffers */
// 冲洗并关闭新 AOF 文件(写入磁盘)
if (fflush(fp) == EOF) goto
if (aof_fsync(fileno(fp)) == -1) goto
if (fclose(fp) == EOF) goto
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok.
* 原子地改名,用重写后的新 AOF 文件覆盖旧 AOF 文件
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
至此子进程完成rewrite操作。那么父进程也就是主线程是在什么时候获得子进程退出状态,并且做了些什么操作?
在上面的serverCron中可以看到:
// 接收子进程发来的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
// BGREWRITEAOF 执行完毕
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
updateDictResizePolicy();
&即父进程在serverCron里通过server.bgrewritechildpid来判断是否需要等待子进程退出的信号。
进一步我们来看一下backgroundRewriteDoneHandler作了哪些操作:(注意这里是AOF的难点,使用了很强的技巧,反正我是看了好半天,才略懂)
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this.
* 当子线程完成 AOF 重写时,父进程调用这个函数。
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
int newfd,
char tmpfile[256];
long long now = ustime();
redisLog(REDIS_NOTICE,
"Background AOF rewrite terminated with success");
/* Flush the differences accumulated by the parent to the
* rewritten AOF. */
// 打开保存新 AOF 文件内容的临时文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
redisLog(REDIS_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
// 将累积的重写缓存写入到临时文件中
// 这个函数调用的 write 操作会阻塞主进程
if (aofRewriteBufferWrite(newfd) == -1) {
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
redisLog(REDIS_NOTICE,
"Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
/* The only remaining thing to do is to rename the temporary file to
* the configured file and switch the file descriptor used to do AOF
* writes. We don't want close(2) or rename(2) calls to block the
* server on old file deletion.
* 剩下的工作就是将临时文件改名为 AOF 程序指定的文件名,
* 并将新文件的 fd 设为 AOF 程序的写目标。
* 不过这里有一个问题 &&
* 我们不想 close(2) 或者 rename(2) 在删除旧文件时阻塞。
* There are two possible scenarios:
* 以下是两个可能的场景:
* 1) AOF is DISABLED and this was a one time rewrite. The temporary
* file will be renamed to the configured file. When this file already
* exists, it will be unlinked, which may block the server.
* AOF 被关闭,这个是一次单次的写操作。
* 临时文件会被改名为 AOF 文件。
* 本来已经存在的 AOF 文件会被 unlink ,这可能会阻塞服务器。
* 2) AOF is ENABLED and the rewritten AOF will immediately start
* receiving writes. After the temporary file is renamed to the
* configured file, the original AOF file descriptor will be closed.
* Since this will be the last reference to that file, closing it
* causes the underlying file to be unlinked, which may block the
* AOF 被开启,并且重写后的 AOF 文件会立即被用于接收新的写入命令。
* 当临时文件被改名为 AOF 文件时,原来的 AOF 文件描述符会被关闭。
* 因为 Redis 会是最后一个引用这个文件的进程,
* 所以关闭这个文件会引起 unlink ,这可能会阻塞服务器。
* To mitigate the blocking effect of the unlink operation (either
* caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
* use a background thread to take care of this. First, we
* make scenario 1 identical to scenario 2 by opening the target file
* when it exists. The unlink operation after the rename(2) will then
* be executed upon calling close(2) for its descriptor. Everything to
* guarantee atomicity for this switch has already happened by then, so
* we don't care what the outcome or duration of that close operation
* is, as long as the file descriptor is released again.
* 为了避免出现阻塞现象,程序会将 close(2) 放到后台线程执行,
* 这样服务器就可以持续处理请求,不会被中断。
if (server.aof_fd == -1) {
/* AOF disabled */
/* Don't care if this fails: oldfd will be -1 and we handle that.
* One notable case of -1 return is if the old file does
* not exist. */
oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
/* AOF enabled */
oldfd = -1; /* We'll set this to the current AOF filedes later. */
/* Rename the temporary file. This will not unlink the target file if
* it exists, because we reference it with "oldfd".
* 对临时文件进行改名,替换现有的 AOF 文件。
* 旧的 AOF 文件不会在这里被 unlink ,因为 oldfd 引用了它。
if (rename(tmpfile,server.aof_filename) == -1) {
redisLog(REDIS_WARNING,
"Error trying to rename the temporary AOF file: %s", strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
if (server.aof_fd == -1) {
/* AOF disabled, we don't need to set the AOF file descriptor
* to this new file, so we can close it.
* AOF 被关闭,直接关闭 AOF 文件,
* 因为关闭 AOF 本来就会引起阻塞,所以这里就算 close 被阻塞也无所谓
close(newfd);
/* AOF enabled, replace the old fd with the new one.
* 用新 AOF 文件的 fd 替换原来 AOF 文件的 fd
oldfd = server.aof_
server.aof_fd =
// 因为前面进行了 AOF 重写缓存追加,所以这里立即 fsync 一次
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
aof_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
aof_background_fsync(newfd);
// 强制引发 SELECT
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
// 更新 AOF 文件的大小
aofUpdateCurrentSize();
// 记录前一次重写时的大小
server.aof_rewrite_base_size = server.aof_current_
/* Clear regular AOF buffer since its contents was just written to
* the new AOF from the background rewrite buffer.
* 清空 AOF 缓存,因为它的内容已经被写入过了,没用了
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
server.aof_lastbgrewrite_status = REDIS_OK;
redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed
* 如果是第一次创建 AOF 文件,那么更新 AOF 状态
* 把close old-aof-file的工作交给backgroud thread来执行
if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
server.aof_state = REDIS_AOF_ON;
/* Asynchronously close the overwritten AOF.
* 异步关闭旧 AOF 文件
* 把close old-aof-file的工作交给backgroud thread来执行
if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
redisLog(REDIS_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
// BGREWRITEAOF 重写出错
} else if (!bysignal && exitcode != 0) {
server.aof_lastbgrewrite_status = REDIS_ERR;
redisLog(REDIS_WARNING,
"Background AOF rewrite terminated with error");
// 未知错误
server.aof_lastbgrewrite_status = REDIS_ERR;
redisLog(REDIS_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
// 清空 AOF 缓冲区
aofRewriteBufferReset();
// 移除临时文件
aofRemoveTempFile(server.aof_child_pid);
// 重置默认属性
server.aof_child_pid = -1;
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
server.aof_rewrite_scheduled = 1;
&关于backgroundRewriteDoneHandler其中为什么这么做,可以参考文章:。
&posted on
阅读(...) 评论()

我要回帖

更多关于 redis集群数据丢失 的文章

 

随机推荐