Status RDB::LoadRdb()

in src/storage/rdb/rdb.cc [557:687]


Status RDB::LoadRdb(engine::Context &ctx, uint32_t db_index, bool overwrite_exist_key) {
  char buf[1024] = {0};
  GET_OR_RET(LogWhenError(stream_->Read(buf, 9)));
  buf[9] = '\0';

  if (memcmp(buf, "REDIS", 5) != 0) {
    warn("Wrong signature trying to load DB from file");
    return {Status::NotOK, "Wrong signature trying to load DB from file"};
  }

  auto rdb_ver = std::atoi(buf + 5);
  if (rdb_ver < 1 || rdb_ver > SupportedRDBVersion) {
    warn("Can't handle RDB format version {}", rdb_ver);
    return {Status::NotOK, fmt::format("Can't handle RDB format version {}", rdb_ver)};
  }

  uint64_t expire_time_ms = 0;
  int64_t expire_keys = 0;
  int64_t load_keys = 0;
  int64_t empty_keys_skipped = 0;
  auto now_ms = util::GetTimeStampMS();
  uint32_t db_id = 0;
  uint64_t skip_exist_keys = 0;
  while (true) {
    auto type = GET_OR_RET(LogWhenError(loadRdbType()));
    if (type == RDBOpcodeExpireTime) {
      expire_time_ms = static_cast<uint64_t>(GET_OR_RET(LogWhenError(loadExpiredTimeSeconds()))) * 1000;
      continue;
    } else if (type == RDBOpcodeExpireTimeMs) {
      expire_time_ms = GET_OR_RET(LogWhenError(loadExpiredTimeMilliseconds(rdb_ver)));
      continue;
    } else if (type == RDBOpcodeFreq) {               // LFU frequency: not use in kvrocks
      GET_OR_RET(LogWhenError(stream_->ReadByte()));  // discard the value
      continue;
    } else if (type == RDBOpcodeIdle) {  // LRU idle time: not use in kvrocks
      uint64_t discard = 0;
      GET_OR_RET(LogWhenError(stream_->Read(reinterpret_cast<char *>(&discard), sizeof(uint64_t))));
      continue;
    } else if (type == RDBOpcodeEof) {
      break;
    } else if (type == RDBOpcodeSelectDB) {
      db_id = GET_OR_RET(LogWhenError(loadObjectLen(nullptr)));
      continue;
    } else if (type == RDBOpcodeResizeDB) {              // not use in kvrocks, hint redis for hash table resize
      GET_OR_RET(LogWhenError(loadObjectLen(nullptr)));  // db_size
      GET_OR_RET(LogWhenError(loadObjectLen(nullptr)));  // expires_size
      continue;
    } else if (type == RDBOpcodeAux) {
      /* AUX: generic string-string fields. Use to add state to RDB
       * which is backward compatible. Implementations of RDB loading
       * are required to skip AUX fields they don't understand.
       *
       * An AUX field is composed of two strings: key and value. */
      auto key = GET_OR_RET(LogWhenError(LoadStringObject()));
      auto value = GET_OR_RET(LogWhenError(LoadStringObject()));
      continue;
    } else if (type == RDBOpcodeModuleAux) {
      warn("RDB module not supported");
      return {Status::NotOK, "RDB module not supported"};
    } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
      warn("RDB function not supported");
      return {Status::NotOK, "RDB function not supported"};
    } else {
      if (!isObjectType(type)) {
        warn("Invalid or Not supported object type: {}", (int)type);
        return {Status::NotOK, fmt::format("Invalid or Not supported object type {}", type)};
      }
    }

    auto key = GET_OR_RET(LogWhenError(LoadStringObject()));
    auto value = GET_OR_RET(LogWhenError(loadRdbObject(type, key)));

    if (db_index != db_id) {  // skip db not match
      continue;
    }

    if (isEmptyRedisObject(value)) {  // compatible with empty value
      /* Since we used to have bug that could lead to empty keys
       * (See #8453), we rather not fail when empty key is encountered
       * in an RDB file, instead we will silently discard it and
       * continue loading. */
      if (empty_keys_skipped++ < 10) {  // only log 10 empty keys, just as redis does.
        warn("skipping empty key: {}", key);
      }
      continue;
    } else if (expire_time_ms != 0 &&
               expire_time_ms < now_ms) {  // in redis this used to feed this deletion to any connected replicas
      expire_keys++;
      continue;
    }

    if (!overwrite_exist_key) {  // only load not exist key
      redis::Database redis(storage_, ns_);
      auto s = redis.KeyExist(ctx, key);
      if (!s.IsNotFound()) {
        skip_exist_keys++;  // skip it even it's not okay
        if (!s.ok()) {
          error("check key {} exist failed: {}", key, s.ToString());
        }
        continue;
      }
    }

    auto ret = saveRdbObject(ctx, type, key, value, expire_time_ms);
    if (!ret.IsOK()) {
      warn("save rdb object key {} failed: {}", key, ret.Msg());
    } else {
      load_keys++;
    }
  }

  // Verify the checksum if RDB version is >= 5
  if (rdb_ver >= MinRdbVersionToVerifyChecksum) {
    uint64_t chk_sum = 0;
    auto expected = GET_OR_RET(LogWhenError(stream_->GetCheckSum()));
    GET_OR_RET(LogWhenError(stream_->Read(reinterpret_cast<char *>(&chk_sum), RDBCheckSumLen)));
    if (chk_sum == 0) {
      warn("RDB file was saved with checksum disabled: no check performed.");
    } else if (chk_sum != expected) {
      warn("Wrong RDB checksum expected: {} got: {}", chk_sum, expected);
      return {Status::NotOK, "All objects were processed and loaded but the checksum is unexpected!"};
    }
  }

  std::string skip_info = (overwrite_exist_key ? ", exist keys skipped: " + std::to_string(skip_exist_keys) : "");

  info("Done loading RDB, keys loaded: {}, keys expired: {}, empty keys skipped: {}{}", load_keys, expire_keys,
       empty_keys_skipped, skip_info);

  return Status::OK();
}