in src/types/redis_zset.cc [39:127]
rocksdb::Status ZSet::Add(engine::Context &ctx, const Slice &user_key, ZAddFlags flags, MemberScores *mscores,
uint64_t *added_cnt) {
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
int added = 0;
int changed = 0;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
std::unordered_set<std::string_view> added_member_keys;
for (auto it = mscores->rbegin(); it != mscores->rend(); ++it) {
if (!added_member_keys.insert(it->member).second) {
continue;
}
std::string member_key = InternalKey(ns_key, it->member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
if (metadata.size > 0) {
std::string old_score_bytes;
s = storage_->Get(ctx, ctx.GetReadOptions(), member_key, &old_score_bytes);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
if (!s.IsNotFound() && flags.HasNX()) {
continue;
}
double old_score = DecodeDouble(old_score_bytes.data());
if (flags.HasIncr()) {
if ((flags.HasLT() && it->score >= 0) || (flags.HasGT() && it->score <= 0)) {
continue;
}
it->score += old_score;
if (std::isnan(it->score)) {
return rocksdb::Status::InvalidArgument("resulting score is not a number (NaN)");
}
}
if (it->score != old_score) {
if ((flags.HasLT() && it->score >= old_score) || (flags.HasGT() && it->score <= old_score)) {
continue;
}
old_score_bytes.append(it->member);
std::string old_score_key =
InternalKey(ns_key, old_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(score_cf_handle_, old_score_key);
if (!s.ok()) return s;
std::string new_score_bytes;
PutDouble(&new_score_bytes, it->score);
s = batch->Put(member_key, new_score_bytes);
if (!s.ok()) return s;
new_score_bytes.append(it->member);
std::string new_score_key =
InternalKey(ns_key, new_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(score_cf_handle_, new_score_key, Slice());
if (!s.ok()) return s;
changed++;
}
continue;
}
}
if (flags.HasXX()) {
continue;
}
std::string score_bytes;
PutDouble(&score_bytes, it->score);
s = batch->Put(member_key, score_bytes);
if (!s.ok()) return s;
score_bytes.append(it->member);
std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(score_cf_handle_, score_key, Slice());
if (!s.ok()) return s;
added++;
}
if (added > 0) {
*added_cnt = added;
metadata.size += added;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
if (flags.HasCH()) {
*added_cnt += changed;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}