rocksdb::Status List::Rem()

in src/types/redis_list.cc [173:265]


rocksdb::Status List::Rem(engine::Context &ctx, const Slice &user_key, int count, const Slice &elem,
                          uint64_t *removed_cnt) {
  *removed_cnt = 0;

  std::string ns_key = AppendNamespacePrefix(user_key);

  ListMetadata metadata(false);
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) return s;

  uint64_t index = count >= 0 ? metadata.head : metadata.tail - 1;
  std::string buf;
  PutFixed64(&buf, index);
  std::string start_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string prefix = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

  bool reversed = count < 0;
  std::vector<uint64_t> to_delete_indexes;
  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
  rocksdb::Slice upper_bound(next_version_prefix);
  read_options.iterate_upper_bound = &upper_bound;
  rocksdb::Slice lower_bound(prefix);
  read_options.iterate_lower_bound = &lower_bound;

  auto iter = util::UniqueIterator(ctx, read_options);
  for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix);
       !reversed ? iter->Next() : iter->Prev()) {
    if (iter->value() == elem) {
      InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
      Slice sub_key = ikey.GetSubKey();
      GetFixed64(&sub_key, &index);
      to_delete_indexes.emplace_back(index);
      if (static_cast<int>(to_delete_indexes.size()) == abs(count)) break;
    }
  }
  if (to_delete_indexes.empty()) {
    return rocksdb::Status::NotFound();
  }

  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLRem), std::to_string(count), elem.ToString()});
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  if (to_delete_indexes.size() == metadata.size) {
    s = batch->Delete(metadata_cf_handle_, ns_key);
    if (!s.ok()) return s;
  } else {
    uint64_t min_to_delete_index = !reversed ? to_delete_indexes[0] : to_delete_indexes[to_delete_indexes.size() - 1];
    uint64_t max_to_delete_index = !reversed ? to_delete_indexes[to_delete_indexes.size() - 1] : to_delete_indexes[0];
    uint64_t left_part_len = max_to_delete_index - metadata.head;
    uint64_t right_part_len = metadata.tail - 1 - min_to_delete_index;
    reversed = left_part_len <= right_part_len;
    buf.clear();
    PutFixed64(&buf, reversed ? max_to_delete_index : min_to_delete_index);
    start_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
    size_t processed = 0;
    for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix);
         !reversed ? iter->Next() : iter->Prev()) {
      if (iter->value() != elem || processed >= to_delete_indexes.size()) {
        buf.clear();
        PutFixed64(&buf, reversed ? max_to_delete_index-- : min_to_delete_index++);
        std::string to_update_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
        s = batch->Put(to_update_key, iter->value());
        if (!s.ok()) return s;
      } else {
        processed++;
      }
    }

    for (uint64_t idx = 0; idx < to_delete_indexes.size(); ++idx) {
      buf.clear();
      PutFixed64(&buf, reversed ? (metadata.head + idx) : (metadata.tail - 1 - idx));
      std::string to_delete_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
      s = batch->Delete(to_delete_key);
      if (!s.ok()) return s;
    }
    if (reversed) {
      metadata.head += to_delete_indexes.size();
    } else {
      metadata.tail -= to_delete_indexes.size();
    }
    metadata.size -= to_delete_indexes.size();
    std::string bytes;
    metadata.Encode(&bytes);
    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
    if (!s.ok()) return s;
  }

  *removed_cnt = to_delete_indexes.size();
  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}