in src/types/redis_stream.cc [1737:1823]
rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOptions &options,
StreamGetPendingEntryResult &pending_infos,
std::vector<StreamNACK> &ext_results) {
const std::string &stream_name = options.stream_name;
const std::string &group_name = options.group_name;
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string get_group_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
std::unordered_set<std::string> consumer_names;
StreamEntryID first_entry_id{StreamEntryID::Maximum()};
StreamEntryID last_entry_id{StreamEntryID::Minimum()};
uint64_t ext_result_count = 0;
uint64_t summary_result_count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (options.with_count && options.count <= ext_result_count) {
break;
}
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
if (first_entry_id > entry_id) {
first_entry_id = entry_id;
}
if (last_entry_id < entry_id) {
last_entry_id = entry_id;
}
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (options.with_time && util::GetTimeStampMS() - pel_entry.last_delivery_time_ms < options.idle_time) {
continue;
}
const std::string &consumer_name = pel_entry.consumer_name;
if (options.with_consumer && options.consumer != consumer_name) {
continue;
}
if (options.with_count) {
ext_results.push_back(
{entry_id, {pel_entry.last_delivery_time_ms, pel_entry.last_delivery_count, consumer_name}});
ext_result_count++;
continue;
}
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::OK();
}
StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
if (consumer_names.find(consumer_name) == consumer_names.end()) {
consumer_names.insert(consumer_name);
pending_infos.consumer_infos.emplace_back(consumer_name, consumer_metadata.pending_number);
}
summary_result_count++;
}
pending_infos.last_entry_id = last_entry_id;
pending_infos.first_entry_id = first_entry_id;
pending_infos.pending_number = summary_result_count;
return rocksdb::Status::OK();
}