rocksdb::Status Stream::range()

in src/types/redis_stream.cc [1118:1195]


rocksdb::Status Stream::range(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata,
                              const StreamRangeOptions &options, std::vector<StreamEntry> *entries) const {
  std::string start_key = internalKeyFromEntryID(ns_key, metadata, options.start);
  std::string end_key = internalKeyFromEntryID(ns_key, metadata, options.end);

  if (start_key == end_key) {
    if (options.exclude_start || options.exclude_end) {
      return rocksdb::Status::OK();
    }

    std::string entry_value;
    auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, start_key, &entry_value);
    if (!s.ok()) {
      return s.IsNotFound() ? rocksdb::Status::OK() : s;
    }

    std::vector<std::string> values;
    auto rv = DecodeRawStreamEntryValue(entry_value, &values);
    if (!rv.IsOK()) {
      return rocksdb::Status::InvalidArgument(rv.Msg());
    }

    entries->emplace_back(options.start.ToString(), std::move(values));
    return rocksdb::Status::OK();
  }

  if ((!options.reverse && options.end < options.start) || (options.reverse && options.start < options.end)) {
    return rocksdb::Status::OK();
  }

  std::string next_version_prefix_key =
      InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
  std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();

  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
  rocksdb::Slice upper_bound(next_version_prefix_key);
  read_options.iterate_upper_bound = &upper_bound;
  rocksdb::Slice lower_bound(prefix_key);
  read_options.iterate_lower_bound = &lower_bound;

  auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
  iter->Seek(start_key);
  if (options.reverse && (!iter->Valid() || iter->key().ToString() != start_key)) {
    iter->SeekForPrev(start_key);
  }

  for (; iter->Valid() && (options.reverse ? iter->key().ToString() >= end_key : iter->key().ToString() <= end_key);
       options.reverse ? iter->Prev() : iter->Next()) {
    if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry) {
      continue;
    }
    if (options.exclude_start && iter->key().ToString() == start_key) {
      continue;
    }

    if (options.exclude_end && iter->key().ToString() == end_key) {
      break;
    }

    std::vector<std::string> values;
    auto rv = DecodeRawStreamEntryValue(iter->value().ToString(), &values);
    if (!rv.IsOK()) {
      return rocksdb::Status::InvalidArgument(rv.Msg());
    }

    entries->emplace_back(entryIDFromInternalKey(iter->key()).ToString(), std::move(values));

    if (options.with_count && entries->size() == options.count) {
      break;
    }
  }

  if (auto s = iter->status(); !s.ok()) {
    return s;
  }

  return rocksdb::Status::OK();
}