in src/types/redis_list.cc [173:265]
rocksdb::Status List::Rem(engine::Context &ctx, const Slice &user_key, int count, const Slice &elem,
uint64_t *removed_cnt) {
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
uint64_t index = count >= 0 ? metadata.head : metadata.tail - 1;
std::string buf;
PutFixed64(&buf, index);
std::string start_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
bool reversed = count < 0;
std::vector<uint64_t> to_delete_indexes;
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options);
for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix);
!reversed ? iter->Next() : iter->Prev()) {
if (iter->value() == elem) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice sub_key = ikey.GetSubKey();
GetFixed64(&sub_key, &index);
to_delete_indexes.emplace_back(index);
if (static_cast<int>(to_delete_indexes.size()) == abs(count)) break;
}
}
if (to_delete_indexes.empty()) {
return rocksdb::Status::NotFound();
}
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLRem), std::to_string(count), elem.ToString()});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
if (to_delete_indexes.size() == metadata.size) {
s = batch->Delete(metadata_cf_handle_, ns_key);
if (!s.ok()) return s;
} else {
uint64_t min_to_delete_index = !reversed ? to_delete_indexes[0] : to_delete_indexes[to_delete_indexes.size() - 1];
uint64_t max_to_delete_index = !reversed ? to_delete_indexes[to_delete_indexes.size() - 1] : to_delete_indexes[0];
uint64_t left_part_len = max_to_delete_index - metadata.head;
uint64_t right_part_len = metadata.tail - 1 - min_to_delete_index;
reversed = left_part_len <= right_part_len;
buf.clear();
PutFixed64(&buf, reversed ? max_to_delete_index : min_to_delete_index);
start_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
size_t processed = 0;
for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix);
!reversed ? iter->Next() : iter->Prev()) {
if (iter->value() != elem || processed >= to_delete_indexes.size()) {
buf.clear();
PutFixed64(&buf, reversed ? max_to_delete_index-- : min_to_delete_index++);
std::string to_update_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(to_update_key, iter->value());
if (!s.ok()) return s;
} else {
processed++;
}
}
for (uint64_t idx = 0; idx < to_delete_indexes.size(); ++idx) {
buf.clear();
PutFixed64(&buf, reversed ? (metadata.head + idx) : (metadata.tail - 1 - idx));
std::string to_delete_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(to_delete_key);
if (!s.ok()) return s;
}
if (reversed) {
metadata.head += to_delete_indexes.size();
} else {
metadata.tail -= to_delete_indexes.size();
}
metadata.size -= to_delete_indexes.size();
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
*removed_cnt = to_delete_indexes.size();
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}