rocksdb::Status ZSet::RangeByRank()

in src/types/redis_zset.cc [215:296]


rocksdb::Status ZSet::RangeByRank(engine::Context &ctx, const Slice &user_key, const RangeRankSpec &spec,
                                  MemberScores *mscores, uint64_t *removed_cnt) {
  if (mscores) mscores->clear();

  uint64_t cnt = 0;
  if (!removed_cnt) removed_cnt = &cnt;
  *removed_cnt = 0;

  std::string ns_key = AppendNamespacePrefix(user_key);

  ZSetMetadata metadata(false);

  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

  int start = spec.start;
  int stop = spec.stop;

  if (start < 0) start += static_cast<int>(metadata.size);
  if (stop < 0) stop += static_cast<int>(metadata.size);
  if (start < 0) start = 0;
  if (stop < 0 || start > stop) {
    return rocksdb::Status::OK();
  }

  std::string score_bytes;
  double score = !(spec.reversed) ? kMinScore : kMaxScore;
  PutDouble(&score_bytes, score);
  std::string start_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string next_version_prefix_key =
      InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

  int removed_subkey = 0;
  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
  rocksdb::Slice upper_bound(next_version_prefix_key);
  read_options.iterate_upper_bound = &upper_bound;
  rocksdb::Slice lower_bound(prefix_key);
  read_options.iterate_lower_bound = &lower_bound;

  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisZSet);
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;
  auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
  iter->Seek(start_key);
  // see comment in RangeByScore()
  if (spec.reversed && (!iter->Valid() || !iter->key().starts_with(prefix_key))) {
    iter->SeekForPrev(start_key);
  }

  int count = 0;
  for (; iter->Valid() && iter->key().starts_with(prefix_key); !(spec.reversed) ? iter->Next() : iter->Prev()) {
    InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
    Slice score_key = ikey.GetSubKey();
    GetDouble(&score_key, &score);
    if (count >= start) {
      if (spec.with_deletion) {
        std::string sub_key = InternalKey(ns_key, score_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
        s = batch->Delete(sub_key);
        if (!s.ok()) return s;
        s = batch->Delete(score_cf_handle_, iter->key());
        if (!s.ok()) return s;
        removed_subkey++;
      } else {
        if (mscores) mscores->emplace_back(MemberScore{score_key.ToString(), score});
      }
      *removed_cnt += 1;
    }
    if (count++ >= stop) break;
  }

  if (removed_subkey) {
    metadata.size -= removed_subkey;
    std::string bytes;
    metadata.Encode(&bytes);
    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
    if (!s.ok()) return s;
    return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
  }
  return rocksdb::Status::OK();
}