in src/types/redis_zset.cc [298:413]
rocksdb::Status ZSet::RangeByScore(engine::Context &ctx, const Slice &user_key, const RangeScoreSpec &spec,
MemberScores *mscores, uint64_t *removed_cnt) {
if (mscores) mscores->clear();
uint64_t cnt = 0;
if (!removed_cnt) removed_cnt = &cnt;
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
// let's get familiar with score first:
// a. score of zset's member is represented by double and it takes up 8 bytes in rocksdb
// b. to make positive double greater than native double in lexicographical order, score
// is required encoding before stored in rocksdb. encoding details see PutDouble()
// c. for convenience, user_score and inner_score respectively represent before and after encoding
//
// next lexicographical ordered inner_score of max:
// a. we can think of inner_score as a fixed 8-byte string. logically, the next lexicographical
// ordered inner_score of max_inner_score is 'max_inner_score + 1' if we assume no overflow.
// 'max_inner_score + 1' means binary increment.
// b. realize binary increment 'max_inner_score + 1'
// use PutDouble() encoding max(max_user_score) to max_inner_score
// memcpy max_inner_score to u64(uint64_t)
// incr u64
// memcpy u64 to max_next_inner_score
// it may not be hard to understand about how to get max_next_inner_score
//
// directly generate max_next_user_score of max_next_inner_score:
// a. give a key argument first:
// for positive score, user_score is positively correlated with inner_score in lexicographical order
// for negative score, user_score is negatively correlated with inner_score in lexicographical order
// more details see PutDouble()
// b. get max_next_user_score of max_next_inner_score:
// for positive max_user_score, max_next_user_score is 'max_user_score + 1'
// for negative max_user_score, max_next_user_score is 'max_user_score - 1'
// Note: fortunately, there is no overflow in fact. more details see binary encoding of double
// binary encoding of double: https://en.wikipedia.org/wiki/Double-precision_floating-point_format
// generate next possible score of max
int64_t i64 = 0;
double max_next_score = 0;
if (spec.reversed && !spec.maxex) {
memcpy(&i64, &spec.max, sizeof(spec.max));
i64 = i64 >= 0 ? i64 + 1 : i64 - 1;
memcpy(&max_next_score, &i64, sizeof(i64));
}
std::string start_score_bytes;
PutDouble(&start_score_bytes, spec.reversed ? (spec.maxex ? spec.max : max_next_score) : spec.min);
std::string start_key =
InternalKey(ns_key, start_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
int pos = 0;
auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
if (!spec.reversed) {
iter->Seek(start_key);
} else {
iter->SeekForPrev(start_key);
if (iter->Valid() && iter->key().starts_with(start_key)) {
iter->Prev();
}
}
for (; iter->Valid() && iter->key().starts_with(prefix_key); !spec.reversed ? iter->Next() : iter->Prev()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice score_key = ikey.GetSubKey();
double score = NAN;
GetDouble(&score_key, &score);
if (spec.reversed) {
if ((spec.minex && score == spec.min) || score < spec.min) break;
if ((spec.maxex && score == spec.max) || score > spec.max) continue;
} else {
if ((spec.minex && score == spec.min) || score < spec.min) continue;
if ((spec.maxex && score == spec.max) || score > spec.max) break;
}
if (spec.offset >= 0 && pos++ < spec.offset) continue;
if (spec.with_deletion) {
std::string sub_key = InternalKey(ns_key, score_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(sub_key);
if (!s.ok()) return s;
s = batch->Delete(score_cf_handle_, iter->key());
if (!s.ok()) return s;
} else {
if (mscores) mscores->emplace_back(MemberScore{score_key.ToString(), score});
}
*removed_cnt += 1;
if (spec.count > 0 && mscores && mscores->size() >= static_cast<unsigned>(spec.count)) break;
}
if (spec.with_deletion && *removed_cnt > 0) {
metadata.size -= *removed_cnt;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
return rocksdb::Status::OK();
}