rocksdb::Status Stream::GetPendingEntries()

in src/types/redis_stream.cc [1737:1823]


rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOptions &options,
                                          StreamGetPendingEntryResult &pending_infos,
                                          std::vector<StreamNACK> &ext_results) {
  const std::string &stream_name = options.stream_name;
  const std::string &group_name = options.group_name;
  std::string ns_key = AppendNamespacePrefix(stream_name);

  StreamMetadata metadata(false);
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) {
    return s.IsNotFound() ? rocksdb::Status::OK() : s;
  }

  std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
  std::string get_group_value;
  s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
  if (!s.ok()) {
    return s.IsNotFound() ? rocksdb::Status::OK() : s;
  }

  std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
  std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);

  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
  rocksdb::Slice upper_bound(end_key);
  read_options.iterate_upper_bound = &upper_bound;
  rocksdb::Slice lower_bound(prefix_key);
  read_options.iterate_lower_bound = &lower_bound;

  auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
  std::unordered_set<std::string> consumer_names;
  StreamEntryID first_entry_id{StreamEntryID::Maximum()};
  StreamEntryID last_entry_id{StreamEntryID::Minimum()};
  uint64_t ext_result_count = 0;
  uint64_t summary_result_count = 0;
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    if (options.with_count && options.count <= ext_result_count) {
      break;
    }
    std::string tmp_group_name;
    StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);

    if (first_entry_id > entry_id) {
      first_entry_id = entry_id;
    }
    if (last_entry_id < entry_id) {
      last_entry_id = entry_id;
    }
    StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
    if (options.with_time && util::GetTimeStampMS() - pel_entry.last_delivery_time_ms < options.idle_time) {
      continue;
    }

    const std::string &consumer_name = pel_entry.consumer_name;

    if (options.with_consumer && options.consumer != consumer_name) {
      continue;
    }

    if (options.with_count) {
      ext_results.push_back(
          {entry_id, {pel_entry.last_delivery_time_ms, pel_entry.last_delivery_count, consumer_name}});
      ext_result_count++;
      continue;
    }
    std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
    std::string get_consumer_value;
    s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
    if (!s.ok() && !s.IsNotFound()) {
      return s;
    }
    if (s.IsNotFound()) {
      return rocksdb::Status::OK();
    }

    StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
    if (consumer_names.find(consumer_name) == consumer_names.end()) {
      consumer_names.insert(consumer_name);
      pending_infos.consumer_infos.emplace_back(consumer_name, consumer_metadata.pending_number);
    }
    summary_result_count++;
  }
  pending_infos.last_entry_id = last_entry_id;
  pending_infos.first_entry_id = first_entry_id;
  pending_infos.pending_number = summary_result_count;
  return rocksdb::Status::OK();
}