in src/types/redis_stream.cc [1118:1195]
rocksdb::Status Stream::range(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata,
const StreamRangeOptions &options, std::vector<StreamEntry> *entries) const {
std::string start_key = internalKeyFromEntryID(ns_key, metadata, options.start);
std::string end_key = internalKeyFromEntryID(ns_key, metadata, options.end);
if (start_key == end_key) {
if (options.exclude_start || options.exclude_end) {
return rocksdb::Status::OK();
}
std::string entry_value;
auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, start_key, &entry_value);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
std::vector<std::string> values;
auto rv = DecodeRawStreamEntryValue(entry_value, &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
entries->emplace_back(options.start.ToString(), std::move(values));
return rocksdb::Status::OK();
}
if ((!options.reverse && options.end < options.start) || (options.reverse && options.start < options.end)) {
return rocksdb::Status::OK();
}
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
iter->Seek(start_key);
if (options.reverse && (!iter->Valid() || iter->key().ToString() != start_key)) {
iter->SeekForPrev(start_key);
}
for (; iter->Valid() && (options.reverse ? iter->key().ToString() >= end_key : iter->key().ToString() <= end_key);
options.reverse ? iter->Prev() : iter->Next()) {
if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry) {
continue;
}
if (options.exclude_start && iter->key().ToString() == start_key) {
continue;
}
if (options.exclude_end && iter->key().ToString() == end_key) {
break;
}
std::vector<std::string> values;
auto rv = DecodeRawStreamEntryValue(iter->value().ToString(), &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
entries->emplace_back(entryIDFromInternalKey(iter->key()).ToString(), std::move(values));
if (options.with_count && entries->size() == options.count) {
break;
}
}
if (auto s = iter->status(); !s.ok()) {
return s;
}
return rocksdb::Status::OK();
}