从0开始学习Redis#
使用的redis版本为7.2.2
gui客户端为 https://goanother.com/
首先看尽量多的文档,了解各种数据类型/存储选择/架构选择,再看代码。
把重要的代码流程记录下来。
1 数据类型#
https://redis.com/redis-enterprise/data-structures/
https://redis.io/docs/data-types/
1.1 string#
最基本的字符/bytes/文本。
redis的key是string类型。
一个string最大512mb。
1.2 list#
list是string的链表。
常用来实现stack和queue。
LPUSH key element [element ...]
1.3 set#
无序的set,存string,元素唯一。
最大容量2^32-1(4,294,967,295)。
1.4 hash#
一个kv的集合。
res1 = r.hset(
"bike:1",
mapping={
"model": "Deimos",
"brand": "Ergonom",
"type": "Enduro bikes",
"price": 4972,
},
)
1.5 sorted set#
有序的set,依某种score排序,如果score相同,按key的字符串顺序再排。
典型应用为排行榜/计数器。
底层实现为hash table + skip list
。
zadd复杂度为O(log(N))。
1.6 stream#
可看作一个append-only log。记录顺序的数据。
典型应用
记录用户操作
记录设备事件
在不同stream中分别记录每个用户的提醒
每个stream有一个唯一id。
有一些trimming策略,防止数据过多。
底层数据结构为radix trees。
1.7 Geospatial indexe#
存地理坐标。可快速查询某个坐标指定半径或box内的数据。
1.8 bitmap#
不是一个真正的数据结构。是一个string/bit vector。
可设置bit
setbit key 10 1
setbit key 10 1
获取单个bit
getbit key 10
1
统计总数
bitcount key
2
1.9 bitfield#
可给任意数字a对应一个数字b,对b进行各种操作。
例如设置玩家的状态,金钱对应的a值定为0,杀怪数a值定为1。
初始金钱
BITFIELD player:1:stats SET u32 #0 1000
杀了一个怪,金钱+50,杀怪数+1。
BITFIELD player:1:stats INCRBY u32 #0 50 INCRBY u32 #1 1
2 replication#
https://redis.io/docs/management/replication/
主要机制
当主从正常连接时,当master发生改变时,给replica发送命令流,在replica上复制master上的操作。保证数据一致。
发生断连,从发起重连,获取缺失的命令(
partial resynchronization
)。无法进行
partial resynchronization
。发起full resynchronization
。整个重做数据。
数据的同步默认是异步的,例如客户端修改一个数据,在master上成功后立即返回,然后再同步到slave,可能造成客户端成功后,从slave读到老数据,数据出现不一致。
用WAIT
(https://redis.io/commands/wait/) 可解决,可等待slave完成同步再返回。
类似mysql/mongodb的write concern之类概念。
重点
复制是异步的
可有多个replica
replica不是只能连master,可以连其他replica。可以做成层级。
复制在master侧是非阻塞的,就是第一点?复制不会导致master不可用。
复制在replica侧大体是非阻塞的?这里讲的不是很清楚。replica做初始同步时,可以用旧数据。有相应的设置。
replica可做scalability,做读写分离/数据安全。
可让master不做持久化,从而提速。让副本去做持久化。
关闭master持久化的危险#
如果master完全关闭持久化,并且打开自动重启。会有危险。
master关闭持久化,副本正常运行。
master挂掉并重启,master数据清空。
副本复制master,数据也被清空。
3 sentinel#
用于非cluster的高可用
https://redis.io/docs/management/sentinel/
https://www.baeldung.com/redis-sentinel-vs-clustering
监控
监控master和replica通知
把监控产生的事件通知给外部自动failover
当master处理问题,启动failover流程,选出一个新的master并通知客户端。作为配置提供者
客户端连上sentinel可获取服务信息,例如上述failover,可获取新的master地址。
sentinel作为分布式系统#
sentinel是一个分布式系统。
多个sentinel节点会监控master,如果他们一致认为master有问题,会启动对应的流程。
即使少数sentinel节点挂掉也可以保证整个系统正常运行。
重点
原则上需要三个sentinel节点
原则上三个节点需要独立,比如在三地。
即使sentinel也无法保证异常情况下
写操作的确认
。但可以降低出错概率。第三方客户端需要支持sentinel。
需要注意网络网络配置
4 cluster#
https://redis.io/docs/management/scaling/
https://redis.io/docs/reference/cluster-spec/
集群用于水平扩展。
待研究
5 持久化#
持久化选项
RDB(Redis Database)
point-in-time快照AOF(Append Only File)
对每个写操作做log。可在重启时用log做replay,还原数据。RDB+AOF
可关闭持久化
RDB优势
rdb是整个数据的一份point-in-time的快照。很适用于备份。
比如每24备份整个数据适用于灾后重建数据
效率高,父进程起一个子进程处理快照就行,自己不用处理磁盘io。
对于大量数据,重启比AOF快。
对于replica,rdb支持
partial resynchronization
。
RDB劣势
对于server崩溃的情况,不善于保住数据。
例如一般可以设置一个快照间隔,比如5分钟做一次快照。那么最多可以能丢5分钟的数据。RDB需要fork一个子进程来走快照流程,如果数据量很大,可造成卡顿。
AOF优势
aof更
耐用
,可以采用不同的fsync(写磁盘)策略,不写
/每秒
/每个query
。
即使是默认的每秒做fsync,仍有很好的表现。默认最多丢失1秒的数据。append-only log,不会有seek操作。断电等情况不会导致数据错乱。
即使出现问题导致某个命令执行了一半,也能妥善处理。有
rewrite
机制。可在后台重写aof,使之包含最小的能重建整个数据的数据。类似定期压缩一下log。log内容清晰易懂。
如果不小心FLUSHALL清除了所有数据,可以在rewrite之前关闭服务,把最后的那个FLUSHALL命令从log中删除,重启后能恢复到之前的状态。
AOF劣势
文件比RDB大
一般情况比RDB慢,因为需要频繁fsync,除非关掉fsync。
如何选择?如果数据尽可能不要丢,用aof。如果数据很多,要快,可以承受丢几分钟数据,用rdb。
6 看代码#
暂时只看最简的流程。不看sentinel和cluster等。
挑一些能反映数据结构重要性质的命令来看。
添加defined
__linux__
6.1 dict基本结构#
本质就是一个hash。
typedef struct dictType {
// 定义一系列函数和变量。形成不同的dict子类型
}
// 具体的一条数据。包含一个key和一个v。next指向hash值相等的下一条数据。
struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; /* Next entry in the same hash bucket. */
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as returned
* by dictType's dictEntryMetadataBytes(). */
};
struct dict {
dictType *type; // 上述dictType
dictEntry **ht_table[2];
unsigned long ht_used[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as defined
* by dictType's dictEntryBytes. */
};
// 关于hash和取模的一些讨论
// hash出来是一个很大的值,好的hash可以认为是均匀分布。
// 大致可以认为取模影响很小,正好可以控制容量,把数据做到一个数组里。
// https://www.quora.com/Why-is-the-modulo-operator-used-in-hashing-What-characteristics-makes-it-ideal-in-calculating-location-of-values-in-a-hash-table
// 有个dictRehash操作待研究
dictAdd(dict *d, void *key, void *val)
dictEntry *entry = dictAddRaw(d,key,NULL);
void *position = dictFindPositionForInsert(d, key, existing);
uint64_t hash = dictHashKey(d, key); // 算hash值
// 常用dictSdsHash
dictSdsHash
dictGenHashFunction
siphash
// 按需用_dictExpandIfNeeded扩容。
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]); // 算index
// index对应bucket
dictEntry **bucket = &d->ht_table[dictIsRehashing(d) ? 1 : 0][idx];
return bucket;
return dictInsertAtPosition(d, key, position);
// 有table 0和table 1之分。暂不深入
// 分配内存。插入到此bucket即可
dictSetVal
6.2 listpack基本结构#
listpack是最新引入的一种结构,取代ziplist。
基本就是把数据按协议打包为二进制数据,依次存到一块内存。而不是简单的链表。
// 值是long long或string
typedef struct {
/* When string is used, it is provided with the length (slen). */
unsigned char *sval;
uint32_t slen;
/* When integer is used, 'sval' is NULL, and lval holds the value. */
long long lval;
} listpackEntry;
// 新建
#define LP_HDR_SIZE 6
unsigned char *lpNew(size_t capacity) {
unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
// 总bytes数存在前4字节
#define lpSetTotalBytes(p,v) do { \
(p)[0] = (v)&0xff; \
(p)[1] = ((v)>>8)&0xff; \
(p)[2] = ((v)>>16)&0xff; \
(p)[3] = ((v)>>24)&0xff; \
} while(0)
lpSetNumElements(lp,0);
// item数量存在5/6字节
#define lpSetNumElements(p,v) do { \
(p)[4] = (v)&0xff; \
(p)[5] = ((v)>>8)&0xff; \
} while(0)
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}
/*初始数据结构
整个就是一串bytes
+------------------+---------------------+-------------+
| 4 bytes | 2 bytes | 1 byte |
| total_bytes=7 | entry_count=0 | LP_EOF |
+------------------+---------------------+-------------+
*/
lpAppendInteger
// 取前4字节总bytes数
uint64_t listpack_bytes = lpGetTotalBytes(lp);
// eof的位置就是总bytes数-1
unsigned char *eofptr = lp + listpack_bytes - 1;
// 在lp中,在eofptr之前,插入lval。
return lpInsertInteger(lp, lval, eofptr, LP_BEFORE, NULL);
// 进行编码
uint64_t enclen; /* The length of the encoded element. */
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
lpEncodeIntegerGetType(lval, intenc, &enclen);
// 按数字范围进行编码
return lpInsert(lp, NULL, intenc, enclen, p, where, newp);
// 针对插入int
// p是eof
unsigned long poff = p-lp; // eof的offset
enctype = LP_ENCODING_INT;
enclen = size;
unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;
// 又是按数字范围编码
// 对于数字来说很小,最大9。这里算它9。
// 9编码得1。
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0;
// 计算插入后的大小
// 例如从初始状态开始
// new_listpack_bytes = 7 + 9 + 1 - 0 = 17
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size - replaced_len;
unsigned char *dst = lp + poff; // 指向末尾
// 变大,需要分配内存。
if (new_listpack_bytes > old_listpack_bytes &&
new_listpack_bytes > lp_malloc_size(lp)) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
// move大块数据
if (where == LP_BEFORE) {
// 对于添加到末尾,old_listpack_bytes-poff == 0。就是不用move。
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
}
// 赋值新数据
memcpy(dst,eleint,enclen);
dst += enclen;
memcpy(dst,backlen,backlen_size);
dst += backlen_size;
// 更新头数据
if (where != LP_REPLACE || delete) {
uint32_t num_elements = lpGetNumElements(lp);
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
if (!delete)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements-1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);
/*添加一个数字后的结构
整个就是一串bytes
+------------------+-------------------+----------------+----------------------+-------------+
| 4 bytes | 2 bytes | enclen=9 byte | backlen_size=1 byte | 1 byte |
| total_bytes=17 | entry_count=1 | eleint | backlen | LP_EOF |
+------------------+-------------------+----------------+----------------------+-------------+
*/
6.3 redis主框架#
主体大概是
单进程
各种初始化
起tcp server收client命令
起各种定时任务,包括数据的持久化。
起epoll事件循环
由事件驱动程序运行
#define LRU_BITS 24
struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
};
typedef struct redisObject robj;
// redis server总结构
struct redisServer {
pid_t pid;
int sentinel_mode;
int module_pipe[2]; /* Pipe used to awake the event loop by module threads. */
connListener listeners[CONN_TYPE_MAX]; /* TCP/Unix/TLS even more types */
aofManifest *aof_manifest; /* Used to track AOFs. */
}
struct redisServer server;
// 各种shared robj
struct sharedObjectsStruct {
}
struct sharedObjectsStruct shared;
main // 总入口
// 时间/随机数等等初始化
// 检查是否sentinel模式
server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);
// exe名字为redis-sentinel或者传了--sentinel参数。
// 本体就这一份代码,sentinel和其他模式是共存的。
initServerConfig
// server各种初始化
// 初始化命令表
server.commands = dictCreate(&commandTableDictType);
server.orig_commands = dictCreate(&commandTableDictType);
populateCommandTable();
// 装载所有命令
retval1 = dictAdd(server.commands, sdsdup(c->fullname), c);
retval2 = dictAdd(server.orig_commands, sdsdup(c->fullname), c);
// 初始化各种模块
moduleInitModulesSystem
moduleRegisterCoreAPI
anetPipe
pthread_mutex_lock(&moduleGIL);
connTypeInitialize
// 注册一些连接接口比如socket
RedisRegisterConnectionTypeSocket
connTypeRegister(&CT_Socket)
// 见CT_Socket。包含socket的各种接口。
// 检查aof和rdb。待续
if (strstr(exec_name,"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(exec_name,"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
// 处理参数
if (argc >= 2) {
}
linuxMemoryWarnings
if (background) daemonize();
serverLog(LL_NOTICE, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
initServer
// 配置信号的处理
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
makeThreadKillable();
// server各种初始化
resetReplicationBuffer
createSharedObjects
// 各种createObject和createStringObject
// 主要创建各种写死的string
adjustOpenFilesLimit
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
aeEventLoop *eventLoop;
// 直接按最大client数预先分配好空间。后续直接用。
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
aeApiCreate(eventLoop)
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
aeApiState *state = zmalloc(sizeof(aeApiState));
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
eventLoop->apidata = state;
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for
* data, and should be unblocked if key is deleted (XREADEDGROUP).
* This is a subset of blocking_keys*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;
// 分配db内存
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
for (j = 0; j < server.dbnum; j++) {
// 初始化每个db的数据
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
// server各种初始化
resetServerStats();
// 起定时任务。执行比较频繁。默认每秒跑10次。CONFIG_DEFAULT_HZ
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
serverCron
// 跑各种定时流程
// 比如是否要关闭系统
// 比如各种检测流程
// 定时对clients做一些操作。比如检查是否要踢掉。
clientsCron();
// aof流程
run_with_period(1000) {
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
server.aof_last_write_status == C_ERR)
{
flushAppendOnlyFile(0);
}
}
// 注册pipe事件到epoll
aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE, modulePipeReadable,NULL)
modulePipeReadable
eventLoopHandleOneShotEvents
// 调用对应的func
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
scriptingInit(1);
functionsInit();
slowlogInit();
latencyMonitorInit();
ACLUpdateDefaultUserPassword(server.requirepass);
applyWatchdogPeriod();
// initServer结束
// 打印logo等
redisAsciiArt
checkTcpBacklogSettings
if (!server.sentinel_mode) {
moduleInitModulesSystemLast();
moduleLoadFromQueue();
}
initListeners();
// 对每种连接类型进行listen
connListen
// 直接调用最开始注册的接口
listener->ct->listen(listener);
InitServerLast(); // 待续
if (!server.sentinel_mode) {
// 初始化aof/rdb数据。见后续分析。
serverLog(LL_NOTICE,"Server initialized");
aofLoadManifestFromDisk();
loadDataFromDisk();
aofOpenIfNeededOnServerStart();
aofDelHistoryFiles();
}
redisSetCpuAffinity(server.server_cpulist);
setOOMScoreAdj(-1);
// 进入事件循环。之后就是死循环,由事件驱动各种流程。
aeMain(server.el);
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
numevents = aeApiPoll(eventLoop, tvp);
epoll_wait // 在此wait epoll事件
}
aeDeleteEventLoop(server.el);
6.4 一个具体命令的触发#
初始化阶段起socket并listen
进入事件循环后client连上来触发accept等
收数据
解析命令
执行命令
// 这里定义了socket连接的各种接口
static ConnectionType CT_Socket = {
.listen = connSocketListen,
.accept_handler = connSocketAcceptHandler,
.connect = connSocketConnect,
.read = connSocketRead,
.set_read_handler = connSocketSetReadHandler,
}
// main中的initListeners会调connSocketListen进行listen。起一个tcp server。
initListeners
connListen
listener->ct->listen(listener);
connSocketListen
listenToPort
sfd->fd[sfd->count] = anetTcpServer
_anetTcpServer
socket
anetListen
bind
listen
createSocketAcceptHandler(listener, connAcceptHandler(listener->ct))
// 对创建的socket进行epoll监听
aeCreateFileEvent
aeApiAddEvent
epoll_ctl
// 当listen有结果
accept_handler
connSocketAcceptHandler
anetTcpAccept
anetGenericAccept
accept
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
connCreateAcceptedSocket
connCreateSocket
// 拿到了对端连接
// 做一些检查。比如是否超出最大client数
createClient(conn)
// 初始化各种client数据
// 设置socket可读事件
connSetReadHandler(conn, readQueryFromClient);
connAccept // 暂时没懂
// 当client可读。比如我们在客户端输入了命令,发送tcp数据到server。
readQueryFromClient
processInputBuffer
processCommandAndResetClient
processCommand
lookupCommand // 匹配命令
lookupCommandLogic(server.commands,argv,argc,0);
// 命令表为struct COMMAND_STRUCT redisCommandTable[]
// server.commands是所有命令的dict。在initServerConfig里填充。
dictFetchValue
// 进行各种检查。各种rejectCommand
call(c,flags) // 最终执行一个命令
c->cmd->proc(c); // 执行redisCommand的proc
lpushCommand // 执行具体的命令比如lpushCommand
afterCommand
postExecutionUnitOperations
propagatePendingCommands
propagateNow
feedAppendOnlyFile
if (server.aof_state == AOF_ON ||
(server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
{
// 执行完一个命令。可能把该命令存到server.aof_buf
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
}
commandProcessed(c);
6.5 lpush流程#
添加数据到listpack结构
// LPUSH key element [element ...]
// 底层是listpack
lpushCommand
pushGenericCommand(c,LIST_HEAD,0);
// 在db的dict里找key
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
lookupKey
dictFind
if (checkType(c,lobj,OBJ_LIST)) return; // 检查是否list类型
if (!lobj) {
// 如果key不存在。新建list。
// listpack是较新的数据结构,取代老的ziplist。
// OBJ_ENCODING_LISTPACK
lobj = createListListpackObject();
dbAdd(c->db,c->argv[1],lobj);
dbAddInternal
// db中添加key,value为新建的list。
}
// 后续的参数都是要push的值。循环push。
for (j = 2; j < c->argc; j++) {
listTypePush(lobj,c->argv[j],where);
// 插入int为例
// head
lpPrependInteger(subject->ptr, (long)value->ptr)
unsigned char *p = lpFirst(lp);
return lpInsertInteger(lp, lval, p, LP_BEFORE, NULL);
// 或tail
lpAppendInteger(subject->ptr, (long)value->ptr);
server.dirty++;
}
// 返回目前list长度
addReplyLongLong(c, listTypeLength(lobj));
6.6 sadd流程#
添加数据到hash表
// SADD key member [member ...]
// member如果是int,会转成string。
// 最简的set。和dict基本一样。是一个hash表。
saddCommand
// 在db的dict里找key
set = lookupKeyWrite(c->db,c->argv[1]);
if (checkType(c,set,OBJ_SET)) return; // 检查是否为set类型
if (set == NULL) { // 如果key不存在
set = setTypeCreate(c->argv[2]->ptr, c->argc - 2);
robj *o = createSetObject();
dict *d = dictCreate(&setDictType);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT; // 可见底层是一个hash table
dbAdd(c->db,c->argv[1],set);
dbAddInternal
// db中添加key,value为新建的set。
} else {
setTypeMaybeConvert(set, c->argc - 2);
}
// 添加后续member
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
setTypeAddAux(subject, value, sdslen(value), 0, 1);
// 和dict操作一样
void *position = dictFindPositionForInsert(ht, sdsval, NULL);
dictInsertAtPosition(ht, sdsval, position);
}
addReplyLongLong(c,added);
6.7 hset流程#
添加键值对到hash
// HSET key field value [field value ...]
hsetCommand
// 检查参数
hashTypeLookupWriteOrCreate
// 检查key存在
robj *o = lookupKeyWrite(c->db,key);
if (checkType(c,o,OBJ_HASH)) return NULL;
if (o == NULL) {
o = createHashObject();
unsigned char *zl = lpNew(0);
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_LISTPACK; // 初始编码是listpack
dbAdd(c->db,key,o);
}
// redis的存储机制。当元素比较小或某些情况下,以listpack等结构来存,它认为memory方面更高效。
// 当数量变大,按配置来,会转成真正的hash表。
// 参数见hash_max_listpack_entries,hash_max_listpack_value等
hashTypeTryConversion(o,c->argv,2,c->argc-1);
hashTypeConvert(o, OBJ_ENCODING_HT);
hashTypeConvertListpack(o, enc);
// OBJ_ENCODING_LISTPACK转为OBJ_ENCODING_HT
// 添加后续元素
for (i = 2; i < c->argc; i += 2)
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
// 按OBJ_ENCODING_HT来看
// 更dictAdd相似。就是走基本的算hash并插入流程。
6.8 skiplist和有序set#
https://igoro.com/archive/skip-lists-are-fascinating/
https://blog.reachsumit.com/posts/2020/07/skip-list/
https://en.wikipedia.org/wiki/Skip_list
https://www.geeksforgeeks.org/skip-list/
https://brilliant.org/wiki/skip-lists/
https://opendatastructures.org/newhtml/ods/latex/skiplists.html
skiplist是一个多层链表结构。
每一层都是有序。
最底层包含所有元素。每向上一层,理想状态是包含下一层的一般元素。具体位置有插入时随机决定。
整个结构在一个概率框架下,大致是一个二分的形式。
增删查都是log的复杂度。
从图示看来,各种操作都非常清晰。
插入时要随机向上添加元素。
删除时把整个节点扣掉即可。
最底层level为0。
#define ZSKIPLIST_MAXLEVEL 32
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward; // 前向的下一个节点
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length; // 元素数。初始0。
int level; // 总level数。初始1。
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
// 有序set
// ZADD key [NX | XX] [GT | LT] [CH] [INCR] score member [score member...]
zaddCommand
zaddGenericCommand(c,ZADD_IN_NONE);
// 检查NX之类各种参数。这里暂忽略
// 检查所有score
// 检查key的存在
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
zobj = zsetTypeCreate(elements, sdslen(c->argv[scoreidx+1]->ptr));
// 同hash,有个转换流程
robj *zobj = createZsetObject();
// 创建zset。
// 初始为OBJ_ENCODING_LISTPACK
// 转化后为OBJ_ENCODING_SKIPLIST
zs->dict = dictCreate(&zsetDictType);
zs->zsl = zslCreate();
zsl->level = 1; // 默认level数
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 分配一个zskiplistNode和32个zskiplistLevel的内存
// 作为第一列空的节点。
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score; // 为0
zn->ele = ele; // 为NULL
// header每一层初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
dbAdd(c->db,key,zobj);
}
// 添加每一对score/member
for (j = 0; j < elements; j++) {
int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
// 针对OBJ_ENCODING_SKIPLIST
// 先从dict里找value
// dict包含所有存在的value
de = dictFind(zs->dict,ele);
if (de == NULL) {
znode = zslInsert(zs->zsl,score,ele);
// skiplist的插入逻辑
// 找到底层位置
// zslRandomLevel随机计算level需要涨到哪一层
// 创建节点,插入到整个结构中。
// 添加到普通dict
dictAdd(zs->dict,ele,&znode->score)
}
}
6.9 超时流程#
添加key到db->expires。在某些必要时间点检查这个expires。
EXPIRE key seconds [NX | XX | GT | LT]
// 设置超时
expireCommand
expireGenericCommand(c,commandTimeSnapshot(),UNIT_SECONDS);
if (checkAlreadyExpired(when)) {
// 已经到期直接删除
dbGenericDelete
} else {
setExpire(c,c->db,key,when);
// db中找key
kde = dictFind(db->dict,key->ptr);
// db中专门用expires存设置了超时的key
de = dictAddOrFind(db->expires,dictGetKey(kde));
// 找到并设置
dictSetSignedIntegerVal(de,when);
}
// 在某些操作节点,比如lookupKey时,会检查这个key的超时状态。如果超时就删除。
lookupKey
expireIfNeeded
if (!keyIsExpired(db,key)) return 0;
deleteExpiredKeyAndPropagate(db,key);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
6.10 AOF和RDB的读取#
https://redis.io/docs/management/persistence/
aof文件包含一个base文件,是一个快照,aof发生重写时生成。
还包含一系列增量文件,所有这些文件组成完整的数据。
这些文件由一个manifest文件管理。
rdb的格式见https://rdb.fnordig.de/file_format.html
可以看到大致就是各种类型和数据依次排列。
相当于把整个redis的数据打包成一个文件。解析时依次读取实际数据然后插入内存中的结构,完成数据的加载。
aof本质是命令的合集,加载时依次读出命令,然后执行即可还原所有数据。
https://www.memurai.com/blog/redis-persistence-deep-dive
main
// 其他初始化流程
if (!server.sentinel_mode) {
serverLog(LL_NOTICE,"Server initialized");
aofLoadManifestFromDisk();
server.aof_manifest = aofManifestCreate();
sds am_name = getAofManifestFileName();
sds am_filepath = makePath(server.aof_dirname, am_name);
aofManifest *am = aofLoadManifestFromFile(am_filepath);
loadDataFromDisk();
if (server.aof_state == AOF_ON) {
// aof流程
int ret = loadAppendOnlyFiles(server.aof_manifest);
total_num = getBaseAndIncrAppendOnlyFilesNum(am);
// 获取base文件和增量文件数
if (am->base_aof_info) num++;
if (am->incr_aof_list) num += listLength(am->incr_aof_list);
total_size = getBaseAndIncrAppendOnlyFilesSize(am, &status);
// 获取所有文件大小
startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0);
// 置为loading状态
if (am->base_aof_info) {
// base文件
ret = loadSingleAppendOnlyFile(aof_name);
// 检查文件格式。可能为aof或rdb。
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
/* Not in RDB format, seek back at 0 offset. */
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
// 如果是rdb格式。aof的base文件就是rdb格式。
rio rdb;
rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL)
int retval = rdbLoadRioWithLoadingCtx(rdb,rdbflags,rsi,&loading_ctx);
char buf[1024];
// 读头部的特殊数据。包含'REDIS',rdb版本等。
if (rioRead(rdb,buf,9) == 0) goto eoferr;
while(1) {
// 读一字节类型
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
// 然后case各种类型
if (type == RDB_OPCODE_EXPIRETIME) {
// 对于这种类型,就是读4字节的int32。存到expiretime
expiretime = rdbLoadTime(rdb);
int32_t t32;
if (rioRead(rdb,&t32,4) == 0) return -1;
return (time_t)t32;
expiretime *= 1000;
if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EOF) {
// 读到eof,结束。
break;
} // ...各种类型
// 不是特殊类型
// 读key
if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
goto eoferr;
// 读value
val = rdbLoadObject(type,rdb,key,db->id,&error);
// 超长的流程,解析所有类型的数据到内存。
// 检查kv。检查过期等等。
// 添加kv到db
int added = dbAddRDBLoad(db,key,val);
}
// 检查checksum
} // 读rdb完成
// 读aof
// aof本质上是一串实际的命令。那么循环读命令并执行即可。
while(1) {
// 读文件
char buf[AOF_ANNOTATION_LINE_MAX_LEN];
fgets(buf,sizeof(buf),fp)
// 匹配命令
cmd = lookupCommand(argv,argc);
// 执行
cmd->proc()
}
} // 读base文件结束
// 读所有增量aof文件
if (listLength(am->incr_aof_list)) {
while ((ln = listNext(&li)) != NULL) {
ret = loadSingleAppendOnlyFile(aof_name);
}
}
} else {
// rdb流程
int rdb_load_ret = rdbLoad(server.rdb_filename, &rsi, rdb_flags);
retval = rdbLoadRio(&rdb,rdbflags,rsi);
// 上面已经看过
}
aofOpenIfNeededOnServerStart();
aofDelHistoryFiles();
}
6.11 AOF和RDB的写入#
main中会起定时任务,按配置定时存aof或rdb数据。
main
initServer
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
serverCron
// 默认每隔1秒存一次aof
flushAppendOnlyFile
// afterCommand中,每个命令都会存到server.aof_buf。
// 这里把这个buf写到文件,即完成了aof的保存。
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
// 存rdb流程
// 如果不存在子进程,且距离上次存rdb至少CONFIG_BGSAVE_RETRY_DELAY=5秒,尝试存rdb。
if (!hasActiveChildProcess() &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE)
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
// fork出新进程来处理rdb
retval = rdbSave(req, filename,rsi,rdbflags);
rdbSaveInternal
rdbSaveRio
// 本质就是把当前整个内存数据依次dump出来,存到一个文件。加载时反向操作。
for (j = 0; j < server.dbnum; j++) {
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
// dump各种类型的数据
// 各种层次的save
}
fflush
fsync
exitFromChild((retval == C_OK) ? 0 : 1);
_exit // 无论如何结束该子进程
} else {
// 父进程继续跑
}
}
7 实战#
7.1 基本replication#
https://redis.io/docs/management/replication/
感觉redis的复制/集群相关内容,以及各种模式的启动方式,比较混乱。
不如mongodb/mysql之类清晰。
除去sentinal
和cluster
运行方式。可以起最基本的主从复制
结构。
对于常规节点,非sentinel。有一个默认的redis.conf
https://download.redis.io/redis-stable/redis.conf
https://download.redis.io/redis-stable/
redis.conf我只改了一个save 3600 1
,每小时存盘,其他默认。
# https://hub.docker.com/r/bitnami/redis
# master
redis:
image: bitnami/redis:7.2.3
volumes:
# 映射数据和配置
- "/XXX/redis/data:/bitnami/redis/data"
- "/XXX/redis/config/:/opt/bitnami/redis/etc"
environment:
TZ: Asia/Shanghai
ALLOW_EMPTY_PASSWORD: 'false'
REDIS_PASSWORD: passsssssss
REDIS_AOF_ENABLED: 'no' # 要关闭aof的话必须在这设置。confi中改appendonly为no没用。
REDIS_PORT_NUMBER: 6379
REDIS_REPLICATION_MODE: master
REDIS_MASTER_HOST: xxx.xxx.xxx.xxx
REDIS_MASTER_PORT_NUMBER: 6379
REDIS_MASTER_PASSWORD: passsssssss
REDIS_REPLICA_IP: 172.16.6.25
# 从
redis:
image: bitnami/redis:7.2.3
volumes:
# 映射数据和配置
- "/XXX/redis/data:/bitnami/redis/data"
- "/XXX/redis/config/:/opt/bitnami/redis/etc/"
environment:
TZ: Asia/Shanghai
ALLOW_EMPTY_PASSWORD: 'false'
REDIS_PASSWORD: passsssssss
REDIS_AOF_ENABLED: 'no' # 要关闭aof的话必须在这设置。confi中改appendonly为no没用。
REDIS_PORT_NUMBER: 6379
REDIS_REPLICATION_MODE: slave
REDIS_MASTER_HOST: xxx.xxx.xxx.xxx
REDIS_MASTER_PORT_NUMBER: 6379
REDIS_MASTER_PASSWORD: passsssssss
REDIS_REPLICA_IP: 172.16.6.34
启动后从节点默认为只读。会自动从主节点同步数据。
如果此时master挂掉,slave还是只读没有变,整个系统不可用。
slave上能看到一些信息,master_link_status=down之类。
重启master,完全自动恢复正常。
replica-serve-stale-data no
这个选项貌似可以再master挂掉时,让slave不可读。
关掉从节点,系统正常运行。从master上可看到一些信息变化,例如slave数量变为0。
此时在master上修改一些数据,再重启slave。slave会迅速同步数据,整体完全恢复正常。
aof持久化大概是每个命令发生时都会进行,量大的话可能非常难受。
如果数据不是非常重要,可关掉aof持久化,可节省磁盘io等。
REDIS_AOF_ENABLED: no
关掉aof。
save 3600 1
设置每小时如果发生数据改变才持久化。
这样最多每小时存一次磁盘。
从节点持久化也是按自己的配置来,并不是master做了一次存盘,slave立马也要存盘。
7.2 sentinal#
https://redis.io/docs/management/sentinel/
https://redis.io/docs/reference/sentinel-clients/
针对sentinel模式有一个默认的sentinel.conf。
https://download.redis.io/redis-stable/sentinel.conf
redis经常会自动修改你的配置,以反映出当前系统实际的配置。
有一个CONFIG REWRITE
命令,可以把启动时用户提供的配置改为当前系统的实际配置。
https://redis.io/commands/config-rewrite/
代码中的rewriteConfig
,rewriteConfigSentinelOption
有不少信息。
sentinel模式的初始化在main函数开始部分server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);
# 官方的最小配置例子
sentinel monitor mymaster 127.0.0.1 6379 2
# sentinel monitor <master-name> <ip> <port> <quorum>
# 监控master-name ip port
# quorum是在当前master失败上,需要达成一致的sentinel数量。
# 3个sentinel可设置为2
sentinel down-after-milliseconds mymaster 60000
# 60秒无响应认为挂了
sentinel failover-timeout mymaster 180000
sentinel parallel-syncs mymaster 1
# 其他master
# 貌似是用于起多套主从。一般不用。
sentinel monitor resque 192.168.1.3 6380 4
sentinel down-after-milliseconds resque 10000
sentinel failover-timeout resque 180000
sentinel parallel-syncs resque 5
可直接裸起看看,拿出里面的默认配置看看。
客户端可以连上。
看到executable
为/redis-sentinel
。
# https://hub.docker.com/r/bitnami/redis-sentinel
redis-stn:
image: bitnami/redis-sentinel:7.2.3
volumes:
- "/XXX/redis-sentinel/config:/bitnami/redis-sentinel/conf"
ports:
- "26379:26379"
environment:
TZ: Asia/Shanghai
ALLOW_EMPTY_PASSWORD: 'false'
REDIS_SENTINEL_PASSWORD: passssssss # 使用conf时貌似无效?得写到conf里。
REDIS_SENTINEL_PORT_NUMBER: 26379
REDIS_SENTINEL_QUORUM: 2
#REDIS_MASTER_SET: mymaster
#REDIS_MASTER_HOST: xxx.xxx.xxx.xxx
#REDIS_MASTER_PORT_NUMBER: 6379
#REDIS_MASTER_PASSWORD: passssssss
# sentinel.conf
# 每个sentinel设置自己的ip
sentinel announce-ip "172.16.6.22"
requirepass "passsssssssssss"
# master信息
sentinel monitor xc_redis_master 172.16.6.34 6379 2
sentinel auth-pass xc_redis_master passsssssssss
所有sentinel都只填一个master就行,从master中能自动获取slave的信息。
sentinel之间会相互发现,可以在log里看到。
sentinel上有些命令是无法执行的,貌似也是不能存实际数据的。
sentinel本质上应该是一个官方的中间件,做在了一套代码里,可作为一个模式独立运行。
此时master,slave和3个sentinel都起来了。用gui客户端都能连上。
5个节点分布在db1_ip/db2_ip/db3_ip三个机器。
ip问题#
docker环境下ip可能比较混乱。
默认配置可能有各种问题。
例如slave连上master后,master获取的slave的ip可能有问题。导致从别处再连这个slave的ip连不上。
从master的info命令可以看到slave的ip,可能是一个docker网络中自定义的ip,例如172.18.0.9
。
slave的log中有sdown slave 172.18.0.9
,诸如此类,导致从其他机器连不上。
在slave的docker-compose配置中用REDIS_REPLICA_IP指定服务器的私网ip,例如172.16.6.34
。
会决定slave的replica-announce-ip参数。
此后可以看到master的info中,slave的ip为正常的服务器私网ip。
slave的log中可看到fix-slave-config slave 172.16.6.34:6379
。
此后slave就不会down了。
可能得深入研究docker网络,有更好的解法。
同理对于sentinel,也是各种sdown sentinel
连不上。
配置REDIS_SENTINEL_ANNOUNCE_IP
即sentinel announce-ip
后解决。
测试1#
# 此时master正常可读写,slave只读。
# 用python客户端测试,同样的结果,写slave报`ReadOnlyError`。
# 起sentinel客户端,可正常读写。
# redis-py有多种client。详见文档。
# https://redis-py.readthedocs.io/en/stable/
import redis
db1_ip = 'xxx'
db2_ip = 'xxx'
db3_ip = 'xxx'
# 普通client
client = redis.Redis(host = db2_ip, port = 6379, db = 1, decode_responses = False, password = '66666')
print(client)
client.ping()
print(client.exists('test'))
print(client.set('wtf', '666666'))
# setinenl client
sentinel = redis.Sentinel([(db1_ip, 26379), (db2_ip, 26379), (db3_ip, 26379)],
db = 1, decode_responses = False,
password = '66666',
sentinel_kwargs = {'password':'66666'})
master = sentinel.discover_master('db2')
print(master)
client = sentinel.master_for('db2', socket_timeout = 3)
print(client)
print(dir(client))
print(client.exists('wtf1'))
print(client.set('wtf1', '666666'))
sentinel的配置中填的是master是内网地址。
sentinel.discover_master目前返回的是内网ip。
从外网无法连上sentinel,待研究。
测试2#
关掉1个sentinel仍然正常,关掉2个sentinel仍然正常。
关掉3个sentinel时,redis.Sentinel失败,报MasterNotFoundError
。
普通client仍然正常。
只要存在一个sentinel,sentinel客户端就能正常运作。
测试3#
关掉slave。sentinel会显示+sdown slave 172.16.6.25:6379 172.16.6.25 6379 @ xc_redis_master 172.16.6.34 6379
。
系统仍正常运行。
实际应用中也需要不断检测slave状态,保证slave可用。
打开slave,sentinel检测到slave,恢复正常。
测试4#
关掉master
sentinel会检测到master挂掉,会进行failover。
看sentinel的log。
redis-stn_1 | 1:X 24 Nov 2023 13:06:20.347 # +sdown master xc_redis_master 172.16.6.34 6379
redis-stn_1 | 1:X 24 Nov 2023 13:06:20.446 * Sentinel new configuration saved on disk
redis-stn_1 | 1:X 24 Nov 2023 13:06:20.447 # +new-epoch 1
redis-stn_1 | 1:X 24 Nov 2023 13:06:20.454 * Sentinel new configuration saved on disk
redis-stn_1 | 1:X 24 Nov 2023 13:06:20.454 # +vote-for-leader 006f59bb45c65186bcc4ff16be7de43a2562ecba 1
redis-stn_1 | 1:X 24 Nov 2023 13:06:21.351 # +config-update-from sentinel 006f59bb45c65186bcc4ff16be7de43a2562ecba 172.16.6.25 26379 @ xc_redis_master 172.16.6.34 6379
redis-stn_1 | 1:X 24 Nov 2023 13:06:21.351 # +switch-master xc_redis_master 172.16.6.34 6379 172.16.6.25 6379
redis-stn_1 | 1:X 24 Nov 2023 13:06:21.351 * +slave slave 172.16.6.34:6379 172.16.6.34 6379 @ xc_redis_master 172.16.6.25 6379
redis-stn_1 | 1:X 24 Nov 2023 13:06:21.358 * Sentinel new configuration saved on disk
可看到原slave切换为master。
挂到到切换完成期间,服务不可用,持续1分钟左右,根据配置来。
切换完成后服务可用。
之后如果打开原master,会变成slave。
此时master和slave完全完成了互换,服务正常运行。
客户端方面
# 起循环模拟实际应用
# 当master挂掉时,立即出现ConnectionRefusedError。
# 此时需等待sentinel进行failover。默认需要30秒判断master挂掉。
# down-after-milliseconds参数可设置
# ConnectionRefusedError会相应持续一段时间
# 等failover完成后客户端会自动恢复。不需要手动干预。
# 恢复后可看到discover_master和discover_slaves结果互换。
while True:
try:
print(f'while True ----------')
print(f"exists {client.exists('wtf1')}")
print(f"get {client.get('wtf1')}")
print(f"set {client.set('wtf1', random.randint(0, 1000))}")
print(f"discover_master = {sentinel.discover_master('xc_redis_master')}")
print(f"discover_slaves = {sentinel.discover_slaves('xc_redis_master')}")
time.sleep(10)
except:
traceback.print_exc()
time.sleep(10)
来回模拟挂掉master和slave,倒腾failover,符合正常预期。
不用手动处理应用程序,等切换完成后client = sentinel.master_for
这个client会自动连到新的master。做好异常处理就行。
测试5#
上面测试时保证master和slave有一个没挂。
如果master和slave都挂掉。有一些问题。
首先两个redis节点初始配置,一个为master一个为slave。
测试时配置保持不变。
当只有一个节点挂掉时,failover完成后会自动调整master,不受配置影响。
也就是节点配置为master,实际可能是slave。
当两个节点都挂掉,如果挂之前的实际角色和配置不一致,这时起配置为slave的节点是起不来的。会报imeout reached before the port went into state "inuse"
。
此时起配置为master的节点可以起来,但是会消耗更长一点时间。
等master起来后,之前起不来的slave可以起来。然后正常运行。
期间应用程序同样不需要手动干预,等待一段时间后会自动恢复。
此情况的数据一致问题没研究。估计极端情况有问题。
但总体还行。如果全挂,是重大灾难。也只需重启master然后slave就能开始服务,不需要折腾数据。
对于非重要数据,可以接受。
重要的数据就应该进有一致性保障的数据库。
或者用redis的wait机制保证数据写入了master和slave。
7.3 cluster#
https://redis.io/docs/management/scaling/
https://redis.io/docs/reference/cluster-spec/
待研究
貌似会强行sharding,而且也需要一堆replication节点。
不是像mysql或mongodb简单起一个replication集群就行。