rocksdb::Status WriteBatchExtractor::DeleteCF()

in src/storage/batch_extractor.cc [283:409]


rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const Slice &key) {
  if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::SecondarySubkey)) {
    return rocksdb::Status::OK();
  }

  std::vector<std::string> command_args;
  std::string ns;

  if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
    std::string user_key;
    std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key, is_slot_id_encoded_);

    auto key_slot_id = GetSlotIdFromKey(user_key);
    if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
      return rocksdb::Status::OK();
    }

    command_args = {"DEL", user_key};
  } else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
    InternalKey ikey(key, is_slot_id_encoded_);
    std::string user_key = ikey.GetKey().ToString();
    auto key_slot_id = GetSlotIdFromKey(user_key);
    if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
      return rocksdb::Status::OK();
    }

    std::string sub_key = ikey.GetSubKey().ToString();
    ns = ikey.GetNamespace().ToString();

    switch (log_data_.GetRedisType()) {
      case kRedisHash:
        command_args = {"HDEL", user_key, sub_key};
        break;
      case kRedisSet:
        command_args = {"SREM", user_key, sub_key};
        break;
      case kRedisZSet:
        command_args = {"ZREM", user_key, sub_key};
        break;
      case kRedisList: {
        auto args = log_data_.GetArguments();
        if (args->empty()) {
          error("Failed to parse write_batch in DeleteCF. Type=List: no arguments, at least should contain a command");
          return rocksdb::Status::OK();
        }

        auto parse_result = ParseInt<int>((*args)[0], 10);
        if (!parse_result) {
          return rocksdb::Status::InvalidArgument(
              fmt::format("failed to parse Redis command from log data: {}", parse_result.Msg()));
        }

        auto cmd = static_cast<RedisCommand>(*parse_result);
        switch (cmd) {
          case kRedisCmdLTrim:
            if (first_seen_) {
              if (args->size() < 3) {
                error(
                    "Failed to parse write_batch in DeleteCF; Command=LTRIM: no enough arguments, should contain start "
                    "and stop");
                return rocksdb::Status::OK();
              }

              command_args = {"LTRIM", user_key, (*args)[1], (*args)[2]};
              first_seen_ = false;
            }
            break;
          case kRedisCmdLRem:
            if (first_seen_) {
              if (args->size() < 3) {
                error(
                    "Failed to parse write_batch in DeleteCF. Command=LREM: no enough arguments, should "
                    "contain count and value");
                return rocksdb::Status::OK();
              }

              command_args = {"LREM", user_key, (*args)[1], (*args)[2]};
              first_seen_ = false;
            }
            break;
          case kRedisCmdLPop:
            command_args = {"LPOP", user_key};
            break;
          case kRedisCmdRPop:
            command_args = {"RPOP", user_key};
            break;
          case kRedisCmdLMove:
            if (first_seen_) {
              if (args->size() < 5) {
                error(
                    "Failed to parse write_batch in DeleteCF; Command=LMOVE: no enough arguments, should "
                    "contain source, destination and where/from arguments");
                return rocksdb::Status::OK();
              }
              command_args = {"LMOVE", (*args)[1], (*args)[2], (*args)[3], (*args)[4]};
              first_seen_ = false;
            }
            break;
          default:
            error("Failed to parse write_batch in DeleteCF. Type=List: unhandled command with code {}", *parse_result);
        }
        break;
      }
      case kRedisSortedint: {
        if (!to_redis_) {
          command_args = {"SIREM", user_key, std::to_string(DecodeFixed64(sub_key.data()))};
        }
        break;
      }
      default:
        break;
    }
  } else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
    InternalKey ikey(key, is_slot_id_encoded_);
    Slice encoded_id = ikey.GetSubKey();
    redis::StreamEntryID entry_id;
    GetFixed64(&encoded_id, &entry_id.ms);
    GetFixed64(&encoded_id, &entry_id.seq);
    command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
  }

  if (!command_args.empty()) {
    resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
  }

  return rocksdb::Status::OK();
}