in src/types/redis_zset.cc [415:501]
rocksdb::Status ZSet::RangeByLex(engine::Context &ctx, const Slice &user_key, const RangeLexSpec &spec,
MemberScores *mscores, uint64_t *removed_cnt) {
if (mscores) mscores->clear();
uint64_t cnt = 0;
if (!removed_cnt) removed_cnt = &cnt;
*removed_cnt = 0;
if (spec.offset > -1 && spec.count == 0) {
return rocksdb::Status::OK();
}
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_member = spec.reversed ? spec.max : spec.min;
std::string start_key = InternalKey(ns_key, start_member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
int pos = 0;
auto iter = util::UniqueIterator(ctx, read_options);
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
if (!spec.reversed) {
iter->Seek(start_key);
} else {
if (spec.max_infinite) {
iter->SeekToLast();
} else {
iter->SeekForPrev(start_key);
}
}
for (; iter->Valid() && iter->key().starts_with(prefix_key); (!spec.reversed ? iter->Next() : iter->Prev())) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice member = ikey.GetSubKey();
if (spec.reversed) {
if (member.ToString() < spec.min || (spec.minex && member == spec.min)) {
break;
}
if ((spec.maxex && member == spec.max) || (!spec.max_infinite && member.ToString() > spec.max)) {
continue;
}
} else {
if (spec.minex && member == spec.min) continue; // the min member was exclusive
if ((spec.maxex && member == spec.max) || (!spec.max_infinite && member.ToString() > spec.max)) break;
}
if (spec.offset >= 0 && pos++ < spec.offset) continue;
if (spec.with_deletion) {
std::string score_bytes = iter->value().ToString();
score_bytes.append(member.data(), member.size());
std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(score_cf_handle_, score_key);
if (!s.ok()) return s;
s = batch->Delete(iter->key());
if (!s.ok()) return s;
} else {
if (mscores) mscores->emplace_back(MemberScore{member.ToString(), DecodeDouble(iter->value().data())});
}
*removed_cnt += 1;
if (spec.count > 0 && mscores && mscores->size() >= static_cast<unsigned>(spec.count)) break;
}
if (spec.with_deletion && *removed_cnt > 0) {
metadata.size -= *removed_cnt;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
return rocksdb::Status::OK();
}