rocksdb::Status List::Insert()

in src/types/redis_list.cc [267:343]


rocksdb::Status List::Insert(engine::Context &ctx, const Slice &user_key, const Slice &pivot, const Slice &elem,
                             bool before, int *new_size) {
  *new_size = 0;
  std::string ns_key = AppendNamespacePrefix(user_key);

  ListMetadata metadata(false);
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) return s;

  std::string buf;
  uint64_t pivot_index = metadata.head - 1;
  PutFixed64(&buf, metadata.head);
  std::string start_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string prefix = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
  rocksdb::Slice upper_bound(next_version_prefix);
  read_options.iterate_upper_bound = &upper_bound;

  auto iter = util::UniqueIterator(ctx, read_options);
  for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
    if (iter->value() == pivot) {
      InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
      Slice sub_key = ikey.GetSubKey();
      GetFixed64(&sub_key, &pivot_index);
      break;
    }
  }
  if (pivot_index == (metadata.head - 1)) {
    *new_size = -1;
    return rocksdb::Status::NotFound();
  }

  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisList,
                             {std::to_string(kRedisCmdLInsert), before ? "1" : "0", pivot.ToString(), elem.ToString()});
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  uint64_t left_part_len = pivot_index - metadata.head + (before ? 0 : 1);
  uint64_t right_part_len = metadata.tail - 1 - pivot_index + (before ? 1 : 0);
  bool reversed = left_part_len <= right_part_len;
  uint64_t new_elem_index = 0;
  if ((reversed && !before) || (!reversed && before)) {
    new_elem_index = pivot_index;
  } else {
    new_elem_index = reversed ? --pivot_index : ++pivot_index;
    !reversed ? iter->Next() : iter->Prev();
  }
  for (; iter->Valid() && iter->key().starts_with(prefix); !reversed ? iter->Next() : iter->Prev()) {
    buf.clear();
    PutFixed64(&buf, reversed ? --pivot_index : ++pivot_index);
    std::string to_update_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
    s = batch->Put(to_update_key, iter->value());
    if (!s.ok()) return s;
  }
  buf.clear();
  PutFixed64(&buf, new_elem_index);
  std::string to_update_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
  s = batch->Put(to_update_key, elem);
  if (!s.ok()) return s;

  if (reversed) {
    metadata.head--;
  } else {
    metadata.tail++;
  }
  metadata.size++;
  std::string bytes;
  metadata.Encode(&bytes);
  s = batch->Put(metadata_cf_handle_, ns_key, bytes);
  if (!s.ok()) return s;

  *new_size = static_cast<int>(metadata.size);
  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}