rocksdb::Status ZSet::Add()

in src/types/redis_zset.cc [39:127]


rocksdb::Status ZSet::Add(engine::Context &ctx, const Slice &user_key, ZAddFlags flags, MemberScores *mscores,
                          uint64_t *added_cnt) {
  *added_cnt = 0;

  std::string ns_key = AppendNamespacePrefix(user_key);

  ZSetMetadata metadata;
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok() && !s.IsNotFound()) return s;

  int added = 0;
  int changed = 0;
  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisZSet);
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;
  std::unordered_set<std::string_view> added_member_keys;
  for (auto it = mscores->rbegin(); it != mscores->rend(); ++it) {
    if (!added_member_keys.insert(it->member).second) {
      continue;
    }
    std::string member_key = InternalKey(ns_key, it->member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
    if (metadata.size > 0) {
      std::string old_score_bytes;
      s = storage_->Get(ctx, ctx.GetReadOptions(), member_key, &old_score_bytes);
      if (!s.ok() && !s.IsNotFound()) return s;
      if (s.ok()) {
        if (!s.IsNotFound() && flags.HasNX()) {
          continue;
        }
        double old_score = DecodeDouble(old_score_bytes.data());
        if (flags.HasIncr()) {
          if ((flags.HasLT() && it->score >= 0) || (flags.HasGT() && it->score <= 0)) {
            continue;
          }
          it->score += old_score;
          if (std::isnan(it->score)) {
            return rocksdb::Status::InvalidArgument("resulting score is not a number (NaN)");
          }
        }
        if (it->score != old_score) {
          if ((flags.HasLT() && it->score >= old_score) || (flags.HasGT() && it->score <= old_score)) {
            continue;
          }
          old_score_bytes.append(it->member);
          std::string old_score_key =
              InternalKey(ns_key, old_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
          s = batch->Delete(score_cf_handle_, old_score_key);
          if (!s.ok()) return s;
          std::string new_score_bytes;
          PutDouble(&new_score_bytes, it->score);
          s = batch->Put(member_key, new_score_bytes);
          if (!s.ok()) return s;
          new_score_bytes.append(it->member);
          std::string new_score_key =
              InternalKey(ns_key, new_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
          s = batch->Put(score_cf_handle_, new_score_key, Slice());
          if (!s.ok()) return s;
          changed++;
        }
        continue;
      }
    }
    if (flags.HasXX()) {
      continue;
    }
    std::string score_bytes;
    PutDouble(&score_bytes, it->score);
    s = batch->Put(member_key, score_bytes);
    if (!s.ok()) return s;
    score_bytes.append(it->member);
    std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
    s = batch->Put(score_cf_handle_, score_key, Slice());
    if (!s.ok()) return s;
    added++;
  }
  if (added > 0) {
    *added_cnt = added;
    metadata.size += added;
    std::string bytes;
    metadata.Encode(&bytes);
    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
    if (!s.ok()) return s;
  }
  if (flags.HasCH()) {
    *added_cnt += changed;
  }
  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}