Status SlotMigrator::migrateComplexKey()

in src/cluster/slot_migrate.cc [773:895]


Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds) {
  std::string cmd;
  {
    auto iter = type_to_cmd.find(metadata.Type());
    if (iter != type_to_cmd.end()) {
      cmd = iter->second;
    } else {
      if (metadata.Type() > RedisTypeNames.size()) {
        return {Status::NotOK, "unknown key type: " + std::to_string(metadata.Type())};
      }
      return {Status::NotOK, "unsupported complex key type: " + metadata.TypeName()};
    }
  }

  std::vector<std::string> user_cmd = {cmd, key.ToString()};
  // Construct key prefix to iterate values of the complex type user key
  std::string slot_key = AppendNamespacePrefix(key);
  std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, true).Encode();
  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
  read_options.snapshot = slot_snapshot_;
  Slice prefix_slice(prefix_subkey);
  read_options.iterate_lower_bound = &prefix_slice;
  // Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
  auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options));

  int item_count = 0;

  for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
    if (stop_migration_) {
      return {Status::NotOK, std::string(errMigrationTaskCanceled)};
    }

    if (!iter->key().starts_with(prefix_subkey)) {
      break;
    }

    // Parse values of the complex key
    // InternalKey is adopted to get complex key's value from the formatted key return by iterator of rocksdb
    InternalKey inkey(iter->key(), true);
    switch (metadata.Type()) {
      case kRedisSet: {
        user_cmd.emplace_back(inkey.GetSubKey().ToString());
        break;
      }
      case kRedisSortedint: {
        auto id = DecodeFixed64(inkey.GetSubKey().ToString().data());
        user_cmd.emplace_back(std::to_string(id));
        break;
      }
      case kRedisZSet: {
        auto score = DecodeDouble(iter->value().ToString().data());
        user_cmd.emplace_back(util::Float2String(score));
        user_cmd.emplace_back(inkey.GetSubKey().ToString());
        break;
      }
      case kRedisBitmap: {
        auto s = migrateBitmapKey(inkey, &iter, &user_cmd, restore_cmds);
        if (!s.IsOK()) {
          return s.Prefixed("failed to migrate bitmap key");
        }
        break;
      }
      case kRedisHash: {
        user_cmd.emplace_back(inkey.GetSubKey().ToString());
        user_cmd.emplace_back(iter->value().ToString());
        break;
      }
      case kRedisList: {
        user_cmd.emplace_back(iter->value().ToString());
        break;
      }
      case kRedisHyperLogLog: {
        break;
      }
      default:
        break;
    }

    // Check item count
    // Exclude bitmap because it does not have hmset-like command
    if (metadata.Type() != kRedisBitmap) {
      item_count++;
      if (item_count >= kMaxItemsInCommand) {
        *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
        current_pipeline_size_++;
        item_count = 0;
        // Have to clear saved items
        user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());

        // Send commands if the pipeline contains enough of them
        auto s = sendCmdsPipelineIfNeed(restore_cmds, false);
        if (!s.IsOK()) {
          return s.Prefixed(errFailedToSendCommands);
        }
      }
    }
  }

  if (auto s = iter->status(); !s.ok()) {
    return {Status::NotOK,
            fmt::format("failed to iterate values of the complex key {}: {}", key.ToString(), s.ToString())};
  }

  // Have to check the item count of the last command list
  if (item_count % kMaxItemsInCommand != 0) {
    *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
    current_pipeline_size_++;
  }

  // Add TTL for complex key
  if (metadata.expire > 0) {
    *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
    current_pipeline_size_++;
  }

  // Send commands if the pipeline contains enough of them
  auto s = sendCmdsPipelineIfNeed(restore_cmds, false);
  if (!s.IsOK()) {
    return s.Prefixed(errFailedToSendCommands);
  }

  return Status::OK();
}