in src/storage/rdb/rdb.cc [557:687]
Status RDB::LoadRdb(engine::Context &ctx, uint32_t db_index, bool overwrite_exist_key) {
char buf[1024] = {0};
GET_OR_RET(LogWhenError(stream_->Read(buf, 9)));
buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0) {
warn("Wrong signature trying to load DB from file");
return {Status::NotOK, "Wrong signature trying to load DB from file"};
}
auto rdb_ver = std::atoi(buf + 5);
if (rdb_ver < 1 || rdb_ver > SupportedRDBVersion) {
warn("Can't handle RDB format version {}", rdb_ver);
return {Status::NotOK, fmt::format("Can't handle RDB format version {}", rdb_ver)};
}
uint64_t expire_time_ms = 0;
int64_t expire_keys = 0;
int64_t load_keys = 0;
int64_t empty_keys_skipped = 0;
auto now_ms = util::GetTimeStampMS();
uint32_t db_id = 0;
uint64_t skip_exist_keys = 0;
while (true) {
auto type = GET_OR_RET(LogWhenError(loadRdbType()));
if (type == RDBOpcodeExpireTime) {
expire_time_ms = static_cast<uint64_t>(GET_OR_RET(LogWhenError(loadExpiredTimeSeconds()))) * 1000;
continue;
} else if (type == RDBOpcodeExpireTimeMs) {
expire_time_ms = GET_OR_RET(LogWhenError(loadExpiredTimeMilliseconds(rdb_ver)));
continue;
} else if (type == RDBOpcodeFreq) { // LFU frequency: not use in kvrocks
GET_OR_RET(LogWhenError(stream_->ReadByte())); // discard the value
continue;
} else if (type == RDBOpcodeIdle) { // LRU idle time: not use in kvrocks
uint64_t discard = 0;
GET_OR_RET(LogWhenError(stream_->Read(reinterpret_cast<char *>(&discard), sizeof(uint64_t))));
continue;
} else if (type == RDBOpcodeEof) {
break;
} else if (type == RDBOpcodeSelectDB) {
db_id = GET_OR_RET(LogWhenError(loadObjectLen(nullptr)));
continue;
} else if (type == RDBOpcodeResizeDB) { // not use in kvrocks, hint redis for hash table resize
GET_OR_RET(LogWhenError(loadObjectLen(nullptr))); // db_size
GET_OR_RET(LogWhenError(loadObjectLen(nullptr))); // expires_size
continue;
} else if (type == RDBOpcodeAux) {
/* AUX: generic string-string fields. Use to add state to RDB
* which is backward compatible. Implementations of RDB loading
* are required to skip AUX fields they don't understand.
*
* An AUX field is composed of two strings: key and value. */
auto key = GET_OR_RET(LogWhenError(LoadStringObject()));
auto value = GET_OR_RET(LogWhenError(LoadStringObject()));
continue;
} else if (type == RDBOpcodeModuleAux) {
warn("RDB module not supported");
return {Status::NotOK, "RDB module not supported"};
} else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
warn("RDB function not supported");
return {Status::NotOK, "RDB function not supported"};
} else {
if (!isObjectType(type)) {
warn("Invalid or Not supported object type: {}", (int)type);
return {Status::NotOK, fmt::format("Invalid or Not supported object type {}", type)};
}
}
auto key = GET_OR_RET(LogWhenError(LoadStringObject()));
auto value = GET_OR_RET(LogWhenError(loadRdbObject(type, key)));
if (db_index != db_id) { // skip db not match
continue;
}
if (isEmptyRedisObject(value)) { // compatible with empty value
/* Since we used to have bug that could lead to empty keys
* (See #8453), we rather not fail when empty key is encountered
* in an RDB file, instead we will silently discard it and
* continue loading. */
if (empty_keys_skipped++ < 10) { // only log 10 empty keys, just as redis does.
warn("skipping empty key: {}", key);
}
continue;
} else if (expire_time_ms != 0 &&
expire_time_ms < now_ms) { // in redis this used to feed this deletion to any connected replicas
expire_keys++;
continue;
}
if (!overwrite_exist_key) { // only load not exist key
redis::Database redis(storage_, ns_);
auto s = redis.KeyExist(ctx, key);
if (!s.IsNotFound()) {
skip_exist_keys++; // skip it even it's not okay
if (!s.ok()) {
error("check key {} exist failed: {}", key, s.ToString());
}
continue;
}
}
auto ret = saveRdbObject(ctx, type, key, value, expire_time_ms);
if (!ret.IsOK()) {
warn("save rdb object key {} failed: {}", key, ret.Msg());
} else {
load_keys++;
}
}
// Verify the checksum if RDB version is >= 5
if (rdb_ver >= MinRdbVersionToVerifyChecksum) {
uint64_t chk_sum = 0;
auto expected = GET_OR_RET(LogWhenError(stream_->GetCheckSum()));
GET_OR_RET(LogWhenError(stream_->Read(reinterpret_cast<char *>(&chk_sum), RDBCheckSumLen)));
if (chk_sum == 0) {
warn("RDB file was saved with checksum disabled: no check performed.");
} else if (chk_sum != expected) {
warn("Wrong RDB checksum expected: {} got: {}", chk_sum, expected);
return {Status::NotOK, "All objects were processed and loaded but the checksum is unexpected!"};
}
}
std::string skip_info = (overwrite_exist_key ? ", exist keys skipped: " + std::to_string(skip_exist_keys) : "");
info("Done loading RDB, keys loaded: {}, keys expired: {}, empty keys skipped: {}{}", load_keys, expire_keys,
empty_keys_skipped, skip_info);
return Status::OK();
}