in src/cluster/slot_migrate.cc [773:895]
Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds) {
std::string cmd;
{
auto iter = type_to_cmd.find(metadata.Type());
if (iter != type_to_cmd.end()) {
cmd = iter->second;
} else {
if (metadata.Type() > RedisTypeNames.size()) {
return {Status::NotOK, "unknown key type: " + std::to_string(metadata.Type())};
}
return {Status::NotOK, "unsupported complex key type: " + metadata.TypeName()};
}
}
std::vector<std::string> user_cmd = {cmd, key.ToString()};
// Construct key prefix to iterate values of the complex type user key
std::string slot_key = AppendNamespacePrefix(key);
std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, true).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
Slice prefix_slice(prefix_subkey);
read_options.iterate_lower_bound = &prefix_slice;
// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options));
int item_count = 0;
for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
if (stop_migration_) {
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}
if (!iter->key().starts_with(prefix_subkey)) {
break;
}
// Parse values of the complex key
// InternalKey is adopted to get complex key's value from the formatted key return by iterator of rocksdb
InternalKey inkey(iter->key(), true);
switch (metadata.Type()) {
case kRedisSet: {
user_cmd.emplace_back(inkey.GetSubKey().ToString());
break;
}
case kRedisSortedint: {
auto id = DecodeFixed64(inkey.GetSubKey().ToString().data());
user_cmd.emplace_back(std::to_string(id));
break;
}
case kRedisZSet: {
auto score = DecodeDouble(iter->value().ToString().data());
user_cmd.emplace_back(util::Float2String(score));
user_cmd.emplace_back(inkey.GetSubKey().ToString());
break;
}
case kRedisBitmap: {
auto s = migrateBitmapKey(inkey, &iter, &user_cmd, restore_cmds);
if (!s.IsOK()) {
return s.Prefixed("failed to migrate bitmap key");
}
break;
}
case kRedisHash: {
user_cmd.emplace_back(inkey.GetSubKey().ToString());
user_cmd.emplace_back(iter->value().ToString());
break;
}
case kRedisList: {
user_cmd.emplace_back(iter->value().ToString());
break;
}
case kRedisHyperLogLog: {
break;
}
default:
break;
}
// Check item count
// Exclude bitmap because it does not have hmset-like command
if (metadata.Type() != kRedisBitmap) {
item_count++;
if (item_count >= kMaxItemsInCommand) {
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
item_count = 0;
// Have to clear saved items
user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
// Send commands if the pipeline contains enough of them
auto s = sendCmdsPipelineIfNeed(restore_cmds, false);
if (!s.IsOK()) {
return s.Prefixed(errFailedToSendCommands);
}
}
}
}
if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK,
fmt::format("failed to iterate values of the complex key {}: {}", key.ToString(), s.ToString())};
}
// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
}
// Add TTL for complex key
if (metadata.expire > 0) {
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}
// Send commands if the pipeline contains enough of them
auto s = sendCmdsPipelineIfNeed(restore_cmds, false);
if (!s.IsOK()) {
return s.Prefixed(errFailedToSendCommands);
}
return Status::OK();
}