in src/types/redis_zset.cc [215:296]
rocksdb::Status ZSet::RangeByRank(engine::Context &ctx, const Slice &user_key, const RangeRankSpec &spec,
MemberScores *mscores, uint64_t *removed_cnt) {
if (mscores) mscores->clear();
uint64_t cnt = 0;
if (!removed_cnt) removed_cnt = &cnt;
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
int start = spec.start;
int stop = spec.stop;
if (start < 0) start += static_cast<int>(metadata.size);
if (stop < 0) stop += static_cast<int>(metadata.size);
if (start < 0) start = 0;
if (stop < 0 || start > stop) {
return rocksdb::Status::OK();
}
std::string score_bytes;
double score = !(spec.reversed) ? kMinScore : kMaxScore;
PutDouble(&score_bytes, score);
std::string start_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
int removed_subkey = 0;
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
iter->Seek(start_key);
// see comment in RangeByScore()
if (spec.reversed && (!iter->Valid() || !iter->key().starts_with(prefix_key))) {
iter->SeekForPrev(start_key);
}
int count = 0;
for (; iter->Valid() && iter->key().starts_with(prefix_key); !(spec.reversed) ? iter->Next() : iter->Prev()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice score_key = ikey.GetSubKey();
GetDouble(&score_key, &score);
if (count >= start) {
if (spec.with_deletion) {
std::string sub_key = InternalKey(ns_key, score_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(sub_key);
if (!s.ok()) return s;
s = batch->Delete(score_cf_handle_, iter->key());
if (!s.ok()) return s;
removed_subkey++;
} else {
if (mscores) mscores->emplace_back(MemberScore{score_key.ToString(), score});
}
*removed_cnt += 1;
}
if (count++ >= stop) break;
}
if (removed_subkey) {
metadata.size -= removed_subkey;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
return rocksdb::Status::OK();
}