in src/storage/batch_extractor.cc [283:409]
rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const Slice &key) {
if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::SecondarySubkey)) {
return rocksdb::Status::OK();
}
std::vector<std::string> command_args;
std::string ns;
if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
std::string user_key;
std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key, is_slot_id_encoded_);
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
command_args = {"DEL", user_key};
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey ikey(key, is_slot_id_encoded_);
std::string user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
std::string sub_key = ikey.GetSubKey().ToString();
ns = ikey.GetNamespace().ToString();
switch (log_data_.GetRedisType()) {
case kRedisHash:
command_args = {"HDEL", user_key, sub_key};
break;
case kRedisSet:
command_args = {"SREM", user_key, sub_key};
break;
case kRedisZSet:
command_args = {"ZREM", user_key, sub_key};
break;
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->empty()) {
error("Failed to parse write_batch in DeleteCF. Type=List: no arguments, at least should contain a command");
return rocksdb::Status::OK();
}
auto parse_result = ParseInt<int>((*args)[0], 10);
if (!parse_result) {
return rocksdb::Status::InvalidArgument(
fmt::format("failed to parse Redis command from log data: {}", parse_result.Msg()));
}
auto cmd = static_cast<RedisCommand>(*parse_result);
switch (cmd) {
case kRedisCmdLTrim:
if (first_seen_) {
if (args->size() < 3) {
error(
"Failed to parse write_batch in DeleteCF; Command=LTRIM: no enough arguments, should contain start "
"and stop");
return rocksdb::Status::OK();
}
command_args = {"LTRIM", user_key, (*args)[1], (*args)[2]};
first_seen_ = false;
}
break;
case kRedisCmdLRem:
if (first_seen_) {
if (args->size() < 3) {
error(
"Failed to parse write_batch in DeleteCF. Command=LREM: no enough arguments, should "
"contain count and value");
return rocksdb::Status::OK();
}
command_args = {"LREM", user_key, (*args)[1], (*args)[2]};
first_seen_ = false;
}
break;
case kRedisCmdLPop:
command_args = {"LPOP", user_key};
break;
case kRedisCmdRPop:
command_args = {"RPOP", user_key};
break;
case kRedisCmdLMove:
if (first_seen_) {
if (args->size() < 5) {
error(
"Failed to parse write_batch in DeleteCF; Command=LMOVE: no enough arguments, should "
"contain source, destination and where/from arguments");
return rocksdb::Status::OK();
}
command_args = {"LMOVE", (*args)[1], (*args)[2], (*args)[3], (*args)[4]};
first_seen_ = false;
}
break;
default:
error("Failed to parse write_batch in DeleteCF. Type=List: unhandled command with code {}", *parse_result);
}
break;
}
case kRedisSortedint: {
if (!to_redis_) {
command_args = {"SIREM", user_key, std::to_string(DecodeFixed64(sub_key.data()))};
}
break;
}
default:
break;
}
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
InternalKey ikey(key, is_slot_id_encoded_);
Slice encoded_id = ikey.GetSubKey();
redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
}
if (!command_args.empty()) {
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
return rocksdb::Status::OK();
}