in src/storage/batch_extractor.cc [45:281]
rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) {
if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::SecondarySubkey)) {
return rocksdb::Status::OK();
}
std::string ns, user_key;
std::vector<std::string> command_args;
if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key, is_slot_id_encoded_);
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
Metadata metadata(kRedisNone);
auto s = metadata.Decode(value);
if (!s.ok()) return s;
if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
if (metadata.expire > 0) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
} else if (metadata.Type() == kRedisJson) {
JsonValue json_value;
s = redis::Json::FromRawString(value.ToString(), &json_value);
if (!s.ok()) return s;
auto json_bytes = json_value.Dump();
if (!json_bytes) return rocksdb::Status::Corruption(json_bytes.Msg());
command_args = {"JSON.SET", user_key, "$", json_bytes.GetValue()};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
if (metadata.expire > 0) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
} else if (metadata.expire > 0) {
auto args = log_data_.GetArguments();
if (args->size() > 0) {
auto parse_result = ParseInt<int>((*args)[0], 10);
if (!parse_result) {
return rocksdb::Status::InvalidArgument(
fmt::format("failed to parse Redis command from log data: {}", parse_result.Msg()));
}
auto cmd = static_cast<RedisCommand>(*parse_result);
if (cmd == kRedisCmdExpire) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
}
}
if (metadata.Type() == kRedisStream) {
auto args = log_data_.GetArguments();
bool is_set_id = args && args->size() > 0 && (*args)[0] == "XSETID";
if (!is_set_id) {
return rocksdb::Status::OK();
}
StreamMetadata stream_metadata;
auto s = stream_metadata.Decode(value);
if (!s.ok()) return s;
command_args = {"XSETID",
user_key,
stream_metadata.last_entry_id.ToString(),
"ENTRIESADDED",
std::to_string(stream_metadata.entries_added),
"MAXDELETEDID",
stream_metadata.max_deleted_entry_id.ToString()};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
return rocksdb::Status::OK();
}
if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey ikey(key, is_slot_id_encoded_);
user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
std::string sub_key = ikey.GetSubKey().ToString();
ns = ikey.GetNamespace().ToString();
switch (log_data_.GetRedisType()) {
case kRedisHash:
command_args = {"HSET", user_key, sub_key, value.ToString()};
break;
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->empty()) {
error("Failed to parse write_batch in PutCF. Type=List: no arguments, at least should contain a command");
return rocksdb::Status::OK();
}
auto parse_result = ParseInt<int>((*args)[0], 10);
if (!parse_result) {
return rocksdb::Status::InvalidArgument(
fmt::format("failed to parse Redis command from log data: {}", parse_result.Msg()));
}
auto cmd = static_cast<RedisCommand>(*parse_result);
switch (cmd) {
case kRedisCmdLSet:
if (args->size() < 2) {
error(
"Failed to parse write_batch in PutCF. Command=LSET: no enough arguments, at least should contain an "
"index");
return rocksdb::Status::OK();
}
command_args = {"LSET", user_key, (*args)[1], value.ToString()};
break;
case kRedisCmdLInsert:
if (first_seen_) {
if (args->size() < 4) {
error(
"Failed to parse write_batch in PutCF. Command=LINSERT: no enough arguments, should contain before "
"pivot values");
return rocksdb::Status::OK();
}
command_args = {"LINSERT", user_key, (*args)[1] == "1" ? "before" : "after", (*args)[2], (*args)[3]};
first_seen_ = false;
}
break;
case kRedisCmdLPush:
command_args = {"LPUSH", user_key, value.ToString()};
break;
case kRedisCmdRPush:
command_args = {"RPUSH", user_key, value.ToString()};
break;
case kRedisCmdLRem:
// LREM will be parsed in DeleteCF, so ignore it here
break;
case kRedisCmdLMove:
// LMOVE will be parsed in DeleteCF, so ignore it here
break;
default:
error("Failed to parse write_batch in PutCF. Type=List: unhandled command with code {}", *parse_result);
}
break;
}
case kRedisSet:
command_args = {"SADD", user_key, sub_key};
break;
case kRedisZSet: {
double score = DecodeDouble(value.data());
command_args = {"ZADD", user_key, std::to_string(score), sub_key};
break;
}
case kRedisBitmap: {
auto args = log_data_.GetArguments();
if (args->empty()) {
error("Failed to parse write_batch in PutCF. Type=Bitmap: no arguments, at least should contain a command");
return rocksdb::Status::OK();
}
auto parsed_cmd = ParseInt<int>((*args)[0], 10);
if (!parsed_cmd) {
return rocksdb::Status::InvalidArgument(
fmt::format("failed to parse Redis command from log data: {}", parsed_cmd.Msg()));
}
auto cmd = static_cast<RedisCommand>(*parsed_cmd);
switch (cmd) {
case kRedisCmdSetBit: {
if (args->size() < 2) {
error(
"Failed to parse write_batch in PutCF. Command=SETBIT: no enough arguments, should contain an "
"offset");
return rocksdb::Status::OK();
}
auto parsed_offset = ParseInt<int>((*args)[1], 10);
if (!parsed_offset) {
return rocksdb::Status::InvalidArgument(
fmt::format("failed to parse an offset of SETBIT: {}", parsed_offset.Msg()));
}
bool bit_value = redis::Bitmap::GetBitFromValueAndOffset(value.ToStringView(), *parsed_offset);
command_args = {"SETBIT", user_key, (*args)[1], bit_value ? "1" : "0"};
break;
}
case kRedisCmdBitOp:
if (first_seen_) {
if (args->size() < 4) {
error(
"Failed to parse write_batch in PutCF. Command=BITOP: no enough arguments, at least should contain "
"srckey");
return rocksdb::Status::OK();
}
command_args = {"BITOP", (*args)[1], user_key};
command_args.insert(command_args.end(), args->begin() + 2, args->end());
first_seen_ = false;
}
break;
case kRedisCmdBitfield:
command_args = {"BITFIELD", user_key};
command_args.insert(command_args.end(), args->begin() + 1, args->end());
break;
default:
error("Failed to parse write_batch in PutCF. Type=Bitmap: unhandled command with code {}", *parsed_cmd);
return rocksdb::Status::OK();
}
break;
}
case kRedisSortedint: {
if (!to_redis_) {
command_args = {"SIADD", user_key, std::to_string(DecodeFixed64(sub_key.data()))};
}
break;
}
// TODO: to implement the case of kRedisBloomFilter
default:
break;
}
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args);
if (!s.IsOK()) {
error("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
return rocksdb::Status::OK();
}
}
if (!command_args.empty()) {
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
return rocksdb::Status::OK();
}