rocksdb::Status ZSet::RangeByLex()

in src/types/redis_zset.cc [415:501]


rocksdb::Status ZSet::RangeByLex(engine::Context &ctx, const Slice &user_key, const RangeLexSpec &spec,
                                 MemberScores *mscores, uint64_t *removed_cnt) {
  if (mscores) mscores->clear();

  uint64_t cnt = 0;
  if (!removed_cnt) removed_cnt = &cnt;
  *removed_cnt = 0;

  if (spec.offset > -1 && spec.count == 0) {
    return rocksdb::Status::OK();
  }

  std::string ns_key = AppendNamespacePrefix(user_key);

  ZSetMetadata metadata(false);
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

  std::string start_member = spec.reversed ? spec.max : spec.min;
  std::string start_key = InternalKey(ns_key, start_member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
  std::string next_version_prefix_key =
      InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
  rocksdb::Slice upper_bound(next_version_prefix_key);
  read_options.iterate_upper_bound = &upper_bound;
  rocksdb::Slice lower_bound(prefix_key);
  read_options.iterate_lower_bound = &lower_bound;

  int pos = 0;
  auto iter = util::UniqueIterator(ctx, read_options);
  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisZSet);
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  if (!spec.reversed) {
    iter->Seek(start_key);
  } else {
    if (spec.max_infinite) {
      iter->SeekToLast();
    } else {
      iter->SeekForPrev(start_key);
    }
  }

  for (; iter->Valid() && iter->key().starts_with(prefix_key); (!spec.reversed ? iter->Next() : iter->Prev())) {
    InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
    Slice member = ikey.GetSubKey();
    if (spec.reversed) {
      if (member.ToString() < spec.min || (spec.minex && member == spec.min)) {
        break;
      }
      if ((spec.maxex && member == spec.max) || (!spec.max_infinite && member.ToString() > spec.max)) {
        continue;
      }
    } else {
      if (spec.minex && member == spec.min) continue;  // the min member was exclusive
      if ((spec.maxex && member == spec.max) || (!spec.max_infinite && member.ToString() > spec.max)) break;
    }
    if (spec.offset >= 0 && pos++ < spec.offset) continue;
    if (spec.with_deletion) {
      std::string score_bytes = iter->value().ToString();
      score_bytes.append(member.data(), member.size());
      std::string score_key = InternalKey(ns_key, score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode();
      s = batch->Delete(score_cf_handle_, score_key);
      if (!s.ok()) return s;
      s = batch->Delete(iter->key());
      if (!s.ok()) return s;
    } else {
      if (mscores) mscores->emplace_back(MemberScore{member.ToString(), DecodeDouble(iter->value().data())});
    }
    *removed_cnt += 1;
    if (spec.count > 0 && mscores && mscores->size() >= static_cast<unsigned>(spec.count)) break;
  }

  if (spec.with_deletion && *removed_cnt > 0) {
    metadata.size -= *removed_cnt;
    std::string bytes;
    metadata.Encode(&bytes);
    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
    if (!s.ok()) return s;
    return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
  }
  return rocksdb::Status::OK();
}