rocksdb::Status Stream::RangeWithPending()

in src/types/redis_stream.cc [1430:1565]


rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stream_name, StreamRangeOptions &options,
                                         std::vector<StreamEntry> *entries, std::string &group_name,
                                         std::string &consumer_name, bool noack, bool latest) {
  entries->clear();

  if (options.with_count && options.count == 0) {
    return rocksdb::Status::OK();
  }

  if (options.exclude_start && options.start.IsMaximum()) {
    return rocksdb::Status::InvalidArgument("invalid start ID for the interval");
  }

  if (options.exclude_end && options.end.IsMinimum()) {
    return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
  }

  std::string ns_key = AppendNamespacePrefix(stream_name);

  StreamMetadata metadata(false);
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) {
    return s.IsNotFound() ? rocksdb::Status::OK() : s;
  }

  std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
  std::string get_group_value;
  s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
  if (!s.ok()) return s;

  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
  std::string get_consumer_value;
  s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
  if (!s.ok() && !s.IsNotFound()) {
    return s;
  }
  if (s.IsNotFound()) {
    int created_number = 0;
    s = createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, &created_number);
    if (!s.ok()) {
      return s;
    }
    s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
  }

  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisStream);
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  StreamConsumerGroupMetadata consumergroup_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
  s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
  if (!s.ok() && !s.IsNotFound()) {
    return s;
  }
  StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
  auto now_ms = util::GetTimeStampMS();
  consumer_metadata.last_attempted_interaction_ms = now_ms;
  consumer_metadata.last_successful_interaction_ms = now_ms;

  if (latest) {
    options.start = consumergroup_metadata.last_delivered_id;
    s = range(ctx, ns_key, metadata, options, entries);
    if (!s.ok()) {
      return s;
    }
    StreamEntryID maxid = {0, 0};
    for (const auto &entry : *entries) {
      StreamEntryID id;
      Status st = ParseStreamEntryID(entry.key, &id);
      if (!st.IsOK()) {
        return rocksdb::Status::InvalidArgument(st.Msg());
      }
      if (id > maxid) {
        maxid = id;
      }
      if (!noack) {
        std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
        StreamPelEntry pel_entry = {0, 0, consumer_name};
        std::string pel_value = encodeStreamPelEntryValue(pel_entry);
        s = batch->Put(stream_cf_handle_, pel_key, pel_value);
        if (!s.ok()) return s;
        consumergroup_metadata.entries_read += 1;
        consumergroup_metadata.pending_number += 1;
        consumer_metadata.pending_number += 1;
      }
    }
    if (maxid > consumergroup_metadata.last_delivered_id) {
      consumergroup_metadata.last_delivered_id = maxid;
    }
  } else {
    std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start);
    std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, StreamEntryID::Maximum());

    rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
    rocksdb::Slice upper_bound(end_key);
    read_options.iterate_upper_bound = &upper_bound;
    rocksdb::Slice lower_bound(prefix_key);
    read_options.iterate_lower_bound = &lower_bound;

    auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
    uint64_t count = 0;
    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
      std::string tmp_group_name;
      StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
      StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
      if (pel_entry.consumer_name != consumer_name) continue;
      std::string raw_value;
      rocksdb::Status st = getEntryRawValue(ctx, ns_key, metadata, entry_id, &raw_value);
      if (!st.ok() && !st.IsNotFound()) {
        return st;
      }
      std::vector<std::string> values;
      auto rv = DecodeRawStreamEntryValue(raw_value, &values);
      if (!rv.IsOK()) {
        return rocksdb::Status::InvalidArgument(rv.Msg());
      }
      entries->emplace_back(entry_id.ToString(), std::move(values));
      pel_entry.last_delivery_count += 1;
      pel_entry.last_delivery_time_ms = now_ms;
      s = batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(pel_entry));
      if (!s.ok()) return s;
      ++count;
      if (count >= options.count) break;
    }

    if (auto s = iter->status(); !s.ok()) {
      return s;
    }
  }
  s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
  if (!s.ok()) return s;
  s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
  if (!s.ok()) return s;
  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}