src/types/redis_zset.cc (783 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "redis_zset.h"
#include <cmath>
#include <limits>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include "db_util.h"
#include "sample_helper.h"
namespace redis {
rocksdb::Status ZSet::GetMetadata(engine::Context &ctx, const Slice &ns_key, ZSetMetadata *metadata) {
return Database::GetMetadata(ctx, {kRedisZSet}, ns_key, metadata);
}
rocksdb::Status ZSet::Add(engine::Context &ctx, const Slice &user_key, ZAddFlags flags, MemberScores *mscores,
uint64_t *added_cnt) {
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
int added = 0;
int changed = 0;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
std::unordered_set<std::string_view> added_member_keys;
for (auto it = mscores->rbegin(); it != mscores->rend(); ++it) {
if (!added_member_keys.insert(it->member).second) {
continue;
}
std::string member_key = InternalKey(ns_key, it->member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
if (metadata.size > 0) {
std::string old_score_bytes;
s = storage_->Get(ctx, ctx.GetReadOptions(), member_key, &old_score_bytes);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
if (!s.IsNotFound() && flags.HasNX()) {
continue;
}
double old_score = DecodeDouble(old_score_bytes.data());
if (flags.HasIncr()) {
if ((flags.HasLT() && it->score >= 0) || (flags.HasGT() && it->score <= 0)) {
continue;
}
it->score += old_score;
if (std::isnan(it->score)) {
return rocksdb::Status::InvalidArgument("resulting score is not a number (NaN)");
}
}
if (it->score != old_score) {
if ((flags.HasLT() && it->score >= old_score) || (flags.HasGT() && it->score <= old_score)) {
continue;
}
old_score_bytes.append(it->member);
std::string old_score_key =
InternalKey(ns_key, old_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(score_cf_handle_, old_score_key);
if (!s.ok()) return s;
std::string new_score_bytes;
PutDouble(&new_score_bytes, it->score);
s = batch->Put(member_key, new_score_bytes);
if (!s.ok()) return s;
new_score_bytes.append(it->member);
std::string new_score_key =
InternalKey(ns_key, new_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(score_cf_handle_, new_score_key, Slice());
if (!s.ok()) return s;
changed++;
}
continue;
}
}
if (flags.HasXX()) {
continue;
}
std::string score_bytes;
PutDouble(&score_bytes, it->score);
s = batch->Put(member_key, score_bytes);
if (!s.ok()) return s;
score_bytes.append(it->member);
std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(score_cf_handle_, score_key, Slice());
if (!s.ok()) return s;
added++;
}
if (added > 0) {
*added_cnt = added;
metadata.size += added;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
if (flags.HasCH()) {
*added_cnt += changed;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status ZSet::Card(engine::Context &ctx, const Slice &user_key, uint64_t *size) {
*size = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
*size = metadata.size;
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::Count(engine::Context &ctx, const Slice &user_key, const RangeScoreSpec &spec, uint64_t *size) {
return RangeByScore(ctx, user_key, spec, nullptr, size);
}
rocksdb::Status ZSet::IncrBy(engine::Context &ctx, const Slice &user_key, const Slice &member, double increment,
double *score) {
uint64_t ret = 0;
std::vector<MemberScore> mscores;
mscores.emplace_back(MemberScore{member.ToString(), increment});
rocksdb::Status s = Add(ctx, user_key, ZAddFlags::Incr(), &mscores, &ret);
if (!s.ok()) return s;
*score = mscores[0].score;
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::Pop(engine::Context &ctx, const Slice &user_key, int count, bool min, MemberScores *mscores) {
mscores->clear();
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
if (count <= 0) return rocksdb::Status::OK();
if (count > static_cast<int>(metadata.size)) count = static_cast<int>(metadata.size);
std::string score_bytes;
double score = min ? kMinScore : kMaxScore;
PutDouble(&score_bytes, score);
std::string start_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
iter->Seek(start_key);
// see comment in RangeByScore()
if (!min && (!iter->Valid() || !iter->key().starts_with(prefix_key))) {
iter->SeekForPrev(start_key);
}
for (; iter->Valid() && iter->key().starts_with(prefix_key); min ? iter->Next() : iter->Prev()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice score_key = ikey.GetSubKey();
GetDouble(&score_key, &score);
mscores->emplace_back(MemberScore{score_key.ToString(), score});
std::string default_cf_key = InternalKey(ns_key, score_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(default_cf_key);
if (!s.ok()) return s;
s = batch->Delete(score_cf_handle_, iter->key());
if (!s.ok()) return s;
if (mscores->size() >= static_cast<unsigned>(count)) break;
}
if (!mscores->empty()) {
metadata.size -= mscores->size();
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status ZSet::RangeByRank(engine::Context &ctx, const Slice &user_key, const RangeRankSpec &spec,
MemberScores *mscores, uint64_t *removed_cnt) {
if (mscores) mscores->clear();
uint64_t cnt = 0;
if (!removed_cnt) removed_cnt = &cnt;
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
int start = spec.start;
int stop = spec.stop;
if (start < 0) start += static_cast<int>(metadata.size);
if (stop < 0) stop += static_cast<int>(metadata.size);
if (start < 0) start = 0;
if (stop < 0 || start > stop) {
return rocksdb::Status::OK();
}
std::string score_bytes;
double score = !(spec.reversed) ? kMinScore : kMaxScore;
PutDouble(&score_bytes, score);
std::string start_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
int removed_subkey = 0;
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
iter->Seek(start_key);
// see comment in RangeByScore()
if (spec.reversed && (!iter->Valid() || !iter->key().starts_with(prefix_key))) {
iter->SeekForPrev(start_key);
}
int count = 0;
for (; iter->Valid() && iter->key().starts_with(prefix_key); !(spec.reversed) ? iter->Next() : iter->Prev()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice score_key = ikey.GetSubKey();
GetDouble(&score_key, &score);
if (count >= start) {
if (spec.with_deletion) {
std::string sub_key = InternalKey(ns_key, score_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(sub_key);
if (!s.ok()) return s;
s = batch->Delete(score_cf_handle_, iter->key());
if (!s.ok()) return s;
removed_subkey++;
} else {
if (mscores) mscores->emplace_back(MemberScore{score_key.ToString(), score});
}
*removed_cnt += 1;
}
if (count++ >= stop) break;
}
if (removed_subkey) {
metadata.size -= removed_subkey;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::RangeByScore(engine::Context &ctx, const Slice &user_key, const RangeScoreSpec &spec,
MemberScores *mscores, uint64_t *removed_cnt) {
if (mscores) mscores->clear();
uint64_t cnt = 0;
if (!removed_cnt) removed_cnt = &cnt;
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
// let's get familiar with score first:
// a. score of zset's member is represented by double and it takes up 8 bytes in rocksdb
// b. to make positive double greater than native double in lexicographical order, score
// is required encoding before stored in rocksdb. encoding details see PutDouble()
// c. for convenience, user_score and inner_score respectively represent before and after encoding
//
// next lexicographical ordered inner_score of max:
// a. we can think of inner_score as a fixed 8-byte string. logically, the next lexicographical
// ordered inner_score of max_inner_score is 'max_inner_score + 1' if we assume no overflow.
// 'max_inner_score + 1' means binary increment.
// b. realize binary increment 'max_inner_score + 1'
// use PutDouble() encoding max(max_user_score) to max_inner_score
// memcpy max_inner_score to u64(uint64_t)
// incr u64
// memcpy u64 to max_next_inner_score
// it may not be hard to understand about how to get max_next_inner_score
//
// directly generate max_next_user_score of max_next_inner_score:
// a. give a key argument first:
// for positive score, user_score is positively correlated with inner_score in lexicographical order
// for negative score, user_score is negatively correlated with inner_score in lexicographical order
// more details see PutDouble()
// b. get max_next_user_score of max_next_inner_score:
// for positive max_user_score, max_next_user_score is 'max_user_score + 1'
// for negative max_user_score, max_next_user_score is 'max_user_score - 1'
// Note: fortunately, there is no overflow in fact. more details see binary encoding of double
// binary encoding of double: https://en.wikipedia.org/wiki/Double-precision_floating-point_format
// generate next possible score of max
int64_t i64 = 0;
double max_next_score = 0;
if (spec.reversed && !spec.maxex) {
memcpy(&i64, &spec.max, sizeof(spec.max));
i64 = i64 >= 0 ? i64 + 1 : i64 - 1;
memcpy(&max_next_score, &i64, sizeof(i64));
}
std::string start_score_bytes;
PutDouble(&start_score_bytes, spec.reversed ? (spec.maxex ? spec.max : max_next_score) : spec.min);
std::string start_key =
InternalKey(ns_key, start_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
int pos = 0;
auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
if (!spec.reversed) {
iter->Seek(start_key);
} else {
iter->SeekForPrev(start_key);
if (iter->Valid() && iter->key().starts_with(start_key)) {
iter->Prev();
}
}
for (; iter->Valid() && iter->key().starts_with(prefix_key); !spec.reversed ? iter->Next() : iter->Prev()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice score_key = ikey.GetSubKey();
double score = NAN;
GetDouble(&score_key, &score);
if (spec.reversed) {
if ((spec.minex && score == spec.min) || score < spec.min) break;
if ((spec.maxex && score == spec.max) || score > spec.max) continue;
} else {
if ((spec.minex && score == spec.min) || score < spec.min) continue;
if ((spec.maxex && score == spec.max) || score > spec.max) break;
}
if (spec.offset >= 0 && pos++ < spec.offset) continue;
if (spec.with_deletion) {
std::string sub_key = InternalKey(ns_key, score_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(sub_key);
if (!s.ok()) return s;
s = batch->Delete(score_cf_handle_, iter->key());
if (!s.ok()) return s;
} else {
if (mscores) mscores->emplace_back(MemberScore{score_key.ToString(), score});
}
*removed_cnt += 1;
if (spec.count > 0 && mscores && mscores->size() >= static_cast<unsigned>(spec.count)) break;
}
if (spec.with_deletion && *removed_cnt > 0) {
metadata.size -= *removed_cnt;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::RangeByLex(engine::Context &ctx, const Slice &user_key, const RangeLexSpec &spec,
MemberScores *mscores, uint64_t *removed_cnt) {
if (mscores) mscores->clear();
uint64_t cnt = 0;
if (!removed_cnt) removed_cnt = &cnt;
*removed_cnt = 0;
if (spec.offset > -1 && spec.count == 0) {
return rocksdb::Status::OK();
}
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_member = spec.reversed ? spec.max : spec.min;
std::string start_key = InternalKey(ns_key, start_member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
int pos = 0;
auto iter = util::UniqueIterator(ctx, read_options);
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
if (!spec.reversed) {
iter->Seek(start_key);
} else {
if (spec.max_infinite) {
iter->SeekToLast();
} else {
iter->SeekForPrev(start_key);
}
}
for (; iter->Valid() && iter->key().starts_with(prefix_key); (!spec.reversed ? iter->Next() : iter->Prev())) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice member = ikey.GetSubKey();
if (spec.reversed) {
if (member.ToString() < spec.min || (spec.minex && member == spec.min)) {
break;
}
if ((spec.maxex && member == spec.max) || (!spec.max_infinite && member.ToString() > spec.max)) {
continue;
}
} else {
if (spec.minex && member == spec.min) continue; // the min member was exclusive
if ((spec.maxex && member == spec.max) || (!spec.max_infinite && member.ToString() > spec.max)) break;
}
if (spec.offset >= 0 && pos++ < spec.offset) continue;
if (spec.with_deletion) {
std::string score_bytes = iter->value().ToString();
score_bytes.append(member.data(), member.size());
std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(score_cf_handle_, score_key);
if (!s.ok()) return s;
s = batch->Delete(iter->key());
if (!s.ok()) return s;
} else {
if (mscores) mscores->emplace_back(MemberScore{member.ToString(), DecodeDouble(iter->value().data())});
}
*removed_cnt += 1;
if (spec.count > 0 && mscores && mscores->size() >= static_cast<unsigned>(spec.count)) break;
}
if (spec.with_deletion && *removed_cnt > 0) {
metadata.size -= *removed_cnt;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::Score(engine::Context &ctx, const Slice &user_key, const Slice &member, double *score) {
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
std::string score_bytes;
std::string member_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, ctx.GetReadOptions(), member_key, &score_bytes);
if (!s.ok()) return s;
*score = DecodeDouble(score_bytes.data());
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::Remove(engine::Context &ctx, const Slice &user_key, const std::vector<Slice> &members,
uint64_t *removed_cnt) {
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
int removed = 0;
std::unordered_set<std::string_view> mset;
for (const auto &member : members) {
if (!mset.insert(member.ToStringView()).second) {
continue;
}
std::string member_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string score_bytes;
s = storage_->Get(ctx, ctx.GetReadOptions(), member_key, &score_bytes);
if (s.ok()) {
score_bytes.append(member.data(), member.size());
std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(member_key);
if (!s.ok()) return s;
s = batch->Delete(score_cf_handle_, score_key);
if (!s.ok()) return s;
removed++;
}
}
if (removed > 0) {
*removed_cnt = removed;
metadata.size -= removed;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status ZSet::Rank(engine::Context &ctx, const Slice &user_key, const Slice &member, bool reversed,
int *member_rank, double *member_score) {
*member_rank = -1;
*member_score = 0.0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
std::string score_bytes;
std::string member_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, read_options, member_key, &score_bytes);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
double target_score = DecodeDouble(score_bytes.data());
std::string start_score_bytes;
double start_score = !reversed ? kMinScore : kMaxScore;
PutDouble(&start_score_bytes, start_score);
std::string start_key =
InternalKey(ns_key, start_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
int rank = 0;
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
iter->Seek(start_key);
// see comment in RangeByScore()
if (reversed && (!iter->Valid() || !iter->key().starts_with(prefix_key))) {
iter->SeekForPrev(start_key);
}
for (; iter->Valid() && iter->key().starts_with(prefix_key); !reversed ? iter->Next() : iter->Prev()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice score_key = ikey.GetSubKey();
double score = NAN;
GetDouble(&score_key, &score);
if (score == target_score && score_key == member) break;
rank++;
}
*member_rank = rank;
*member_score = target_score;
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::Overwrite(engine::Context &ctx, const Slice &user_key, const MemberScores &mscores) {
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
for (const auto &ms : mscores) {
std::string score_bytes;
std::string member_key = InternalKey(ns_key, ms.member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
PutDouble(&score_bytes, ms.score);
s = batch->Put(member_key, score_bytes);
if (!s.ok()) return s;
score_bytes.append(ms.member);
std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(score_cf_handle_, score_key, Slice());
if (!s.ok()) return s;
}
metadata.size = static_cast<uint32_t>(mscores.size());
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status ZSet::InterStore(engine::Context &ctx, const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt) {
*saved_cnt = 0;
std::vector<MemberScore> members;
auto s = Inter(ctx, keys_weights, aggregate_method, &members);
if (!s.ok()) return s;
*saved_cnt = members.size();
return Overwrite(ctx, dst, members);
}
rocksdb::Status ZSet::Inter(engine::Context &ctx, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, std::vector<MemberScore> *members) {
std::map<std::string, double> dst_zset;
std::map<std::string, size_t> member_counters;
std::vector<MemberScore> target_mscores;
uint64_t target_size = 0;
RangeScoreSpec spec;
auto s = RangeByScore(ctx, keys_weights[0].key, spec, &target_mscores, &target_size);
if (!s.ok() || target_mscores.empty()) return s;
for (const auto &ms : target_mscores) {
double score = ms.score * keys_weights[0].weight;
if (std::isnan(score)) score = 0;
dst_zset[ms.member] = score;
member_counters[ms.member] = 1;
}
for (size_t i = 1; i < keys_weights.size(); i++) {
s = RangeByScore(ctx, keys_weights[i].key, spec, &target_mscores, &target_size);
if (!s.ok() || target_mscores.empty()) return s;
for (const auto &ms : target_mscores) {
if (dst_zset.find(ms.member) == dst_zset.end()) continue;
member_counters[ms.member]++;
double score = ms.score * keys_weights[i].weight;
if (std::isnan(score)) score = 0;
switch (aggregate_method) {
case kAggregateSum:
dst_zset[ms.member] += score;
if (std::isnan(dst_zset[ms.member])) {
dst_zset[ms.member] = 0;
}
break;
case kAggregateMin:
if (dst_zset[ms.member] > score) {
dst_zset[ms.member] = score;
}
break;
case kAggregateMax:
if (dst_zset[ms.member] < score) {
dst_zset[ms.member] = score;
}
break;
}
}
}
if (members && !dst_zset.empty()) {
members->reserve(dst_zset.size());
for (const auto &iter : dst_zset) {
if (member_counters[iter.first] != keys_weights.size()) continue;
members->emplace_back(MemberScore{iter.first, iter.second});
}
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::InterCard(engine::Context &ctx, const std::vector<std::string> &user_keys, uint64_t limit,
uint64_t *inter_cnt) {
std::vector<MemberScores> mscores_list;
mscores_list.reserve(user_keys.size());
RangeScoreSpec spec;
for (const auto &user_key : user_keys) {
MemberScores mscores;
auto s = RangeByScore(ctx, user_key, spec, &mscores, nullptr);
if (!s.ok() || mscores.empty()) return s;
mscores_list.emplace_back(std::move(mscores));
}
std::sort(mscores_list.begin(), mscores_list.end(),
[](const MemberScores &v1, const MemberScores &v2) { return v1.size() < v2.size(); });
auto base_mscores = mscores_list[0];
std::map<std::string, size_t> member_counters;
uint64_t cardinality = 0;
for (const auto &base_ms : base_mscores) {
member_counters[base_ms.member] = 1;
for (size_t i = 1; i < mscores_list.size(); i++) {
for (const auto &ms : mscores_list[i]) {
if (base_ms.member == ms.member) {
member_counters[ms.member]++;
break;
}
}
}
if (member_counters[base_ms.member] == mscores_list.size()) {
cardinality++;
if (limit > 0 && cardinality >= limit) {
*inter_cnt = limit;
return rocksdb::Status::OK();
};
}
}
*inter_cnt = cardinality;
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::UnionStore(engine::Context &ctx, const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt) {
*saved_cnt = 0;
std::vector<MemberScore> members;
auto s = Union(ctx, keys_weights, aggregate_method, &members);
if (!s.ok()) return s;
*saved_cnt = members.size();
return Overwrite(ctx, dst, members);
}
rocksdb::Status ZSet::Union(engine::Context &ctx, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, std::vector<MemberScore> *members) {
std::map<std::string, double> dst_zset;
std::vector<MemberScore> target_mscores;
uint64_t target_size = 0;
RangeScoreSpec spec;
for (const auto &key_weight : keys_weights) {
// get all member
auto s = RangeByScore(ctx, key_weight.key, spec, &target_mscores, &target_size);
if (!s.ok() && !s.IsNotFound()) return s;
for (const auto &ms : target_mscores) {
double score = ms.score * key_weight.weight;
if (std::isnan(score)) score = 0;
if (dst_zset.find(ms.member) == dst_zset.end()) {
dst_zset[ms.member] = score;
} else {
switch (aggregate_method) {
case kAggregateSum:
dst_zset[ms.member] += score;
if (std::isnan(dst_zset[ms.member])) dst_zset[ms.member] = 0;
break;
case kAggregateMin:
if (dst_zset[ms.member] > score) {
dst_zset[ms.member] = score;
}
break;
case kAggregateMax:
if (dst_zset[ms.member] < score) {
dst_zset[ms.member] = score;
}
break;
}
}
}
}
if (members && !dst_zset.empty()) {
members->reserve(dst_zset.size());
for (const auto &iter : dst_zset) {
members->emplace_back(MemberScore{iter.first, iter.second});
}
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::Scan(engine::Context &ctx, const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &member_prefix, std::vector<std::string> *members,
std::vector<double> *scores) {
if (scores != nullptr) {
std::vector<std::string> values;
auto s = SubKeyScanner::Scan(ctx, kRedisZSet, user_key, cursor, limit, member_prefix, members, &values);
if (!s.ok()) return s;
for (const auto &value : values) {
double target_score = DecodeDouble(value.data());
scores->emplace_back(target_score);
}
return s;
}
return SubKeyScanner::Scan(ctx, kRedisZSet, user_key, cursor, limit, member_prefix, members);
}
rocksdb::Status ZSet::MGet(engine::Context &ctx, const Slice &user_key, const std::vector<Slice> &members,
std::map<std::string, double> *mscores) {
mscores->clear();
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
std::string score_bytes;
for (const auto &member : members) {
std::string member_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
score_bytes.clear();
s = storage_->Get(ctx, ctx.GetReadOptions(), member_key, &score_bytes);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound()) {
continue;
}
double target_score = DecodeDouble(score_bytes.data());
(*mscores)[member.ToString()] = target_score;
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::GetAllMemberScores(engine::Context &ctx, const Slice &user_key,
std::vector<MemberScore> *member_scores) {
member_scores->clear();
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_upper_bound = &upper_bound;
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_);
for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
Slice score_key = ikey.GetSubKey();
double score = NAN;
GetDouble(&score_key, &score);
member_scores->emplace_back(MemberScore{score_key.ToString(), score});
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::RandMember(engine::Context &ctx, const Slice &user_key, int64_t command_count,
std::vector<MemberScore> *member_scores) {
if (command_count == 0) {
return rocksdb::Status::OK();
}
uint64_t count = command_count > 0 ? static_cast<uint64_t>(command_count) : static_cast<uint64_t>(-command_count);
bool unique = (command_count >= 0);
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
if (metadata.size == 0) return rocksdb::Status::OK();
return ExtractRandMemberFromSet<MemberScore>(
unique, count,
[this, user_key, &ctx](std::vector<MemberScore> *scores) -> rocksdb::Status {
return this->GetAllMemberScores(ctx, user_key, scores);
},
member_scores);
}
rocksdb::Status ZSet::Diff(engine::Context &ctx, const std::vector<Slice> &keys, MemberScores *members) {
members->clear();
MemberScores source_member_scores;
RangeScoreSpec spec;
uint64_t first_element_size = 0;
auto s = RangeByScore(ctx, keys[0], spec, &source_member_scores, &first_element_size);
if (!s.ok()) return s;
if (first_element_size == 0) {
return rocksdb::Status::OK();
}
std::set<std::string> exclude_members;
MemberScores target_member_scores;
for (size_t i = 1; i < keys.size(); i++) {
uint64_t size = 0;
s = RangeByScore(ctx, keys[i], spec, &target_member_scores, &size);
if (!s.ok()) return s;
for (auto &member_score : target_member_scores) {
exclude_members.emplace(std::move(member_score.member));
}
target_member_scores.clear();
}
for (const auto &member_score : source_member_scores) {
if (exclude_members.find(member_score.member) == exclude_members.end()) {
members->push_back(member_score);
}
}
return rocksdb::Status::OK();
}
rocksdb::Status ZSet::DiffStore(engine::Context &ctx, const Slice &dst, const std::vector<Slice> &keys,
uint64_t *stored_count) {
MemberScores mscores;
auto s = Diff(ctx, keys, &mscores);
if (!s.ok()) return s;
*stored_count = mscores.size();
return Overwrite(ctx, dst, mscores);
}
} // namespace redis