in src/types/redis_list.cc [267:343]
rocksdb::Status List::Insert(engine::Context &ctx, const Slice &user_key, const Slice &pivot, const Slice &elem,
bool before, int *new_size) {
*new_size = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
std::string buf;
uint64_t pivot_index = metadata.head - 1;
PutFixed64(&buf, metadata.head);
std::string start_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
auto iter = util::UniqueIterator(ctx, read_options);
for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
if (iter->value() == pivot) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice sub_key = ikey.GetSubKey();
GetFixed64(&sub_key, &pivot_index);
break;
}
}
if (pivot_index == (metadata.head - 1)) {
*new_size = -1;
return rocksdb::Status::NotFound();
}
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisList,
{std::to_string(kRedisCmdLInsert), before ? "1" : "0", pivot.ToString(), elem.ToString()});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
uint64_t left_part_len = pivot_index - metadata.head + (before ? 0 : 1);
uint64_t right_part_len = metadata.tail - 1 - pivot_index + (before ? 1 : 0);
bool reversed = left_part_len <= right_part_len;
uint64_t new_elem_index = 0;
if ((reversed && !before) || (!reversed && before)) {
new_elem_index = pivot_index;
} else {
new_elem_index = reversed ? --pivot_index : ++pivot_index;
!reversed ? iter->Next() : iter->Prev();
}
for (; iter->Valid() && iter->key().starts_with(prefix); !reversed ? iter->Next() : iter->Prev()) {
buf.clear();
PutFixed64(&buf, reversed ? --pivot_index : ++pivot_index);
std::string to_update_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(to_update_key, iter->value());
if (!s.ok()) return s;
}
buf.clear();
PutFixed64(&buf, new_elem_index);
std::string to_update_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(to_update_key, elem);
if (!s.ok()) return s;
if (reversed) {
metadata.head--;
} else {
metadata.tail++;
}
metadata.size++;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
*new_size = static_cast<int>(metadata.size);
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}