rocksdb::Status Stream::ClaimPelEntries()

in src/types/redis_stream.cc [396:525]


rocksdb::Status Stream::ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
                                        const std::string &consumer_name, const uint64_t min_idle_time_ms,
                                        const std::vector<StreamEntryID> &entry_ids, const StreamClaimOptions &options,
                                        StreamClaimResult *result) {
  std::string ns_key = AppendNamespacePrefix(stream_name);

  StreamMetadata metadata(false);
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) return s;

  std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
  std::string get_group_value;
  s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
  if (!s.ok()) return s;

  StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
  std::string get_consumer_value;
  s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
  if (!s.ok() && !s.IsNotFound()) {
    return s;
  }
  if (s.IsNotFound()) {
    int created_number = 0;
    s = createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, &created_number);
    if (!s.ok()) {
      return s;
    }
    group_metadata.consumer_number += created_number;
  }
  StreamConsumerMetadata consumer_metadata;
  if (!s.IsNotFound()) {
    consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
  }
  auto now = util::GetTimeStampMS();
  consumer_metadata.last_attempted_interaction_ms = now;
  consumer_metadata.last_successful_interaction_ms = now;

  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisStream);
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  for (const auto &id : entry_ids) {
    std::string raw_value;
    rocksdb::Status s = getEntryRawValue(ctx, ns_key, metadata, id, &raw_value);
    if (!s.ok() && !s.IsNotFound()) {
      return s;
    }
    if (s.IsNotFound()) continue;

    std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
    std::string value;
    s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, entry_key, &value);
    StreamPelEntry pel_entry;

    if (!s.ok() && s.IsNotFound() && options.force) {
      pel_entry = {0, 0, ""};
      group_metadata.pending_number += 1;
    }

    if (s.ok()) {
      pel_entry = decodeStreamPelEntryValue(value);
    }

    if (s.ok() || (s.IsNotFound() && options.force)) {
      if (now - pel_entry.last_delivery_time_ms < min_idle_time_ms) continue;

      std::vector<std::string> values;
      if (options.just_id) {
        result->ids.emplace_back(id.ToString());
      } else {
        auto rv = DecodeRawStreamEntryValue(raw_value, &values);
        if (!rv.IsOK()) {
          return rocksdb::Status::InvalidArgument(rv.Msg());
        }
        result->entries.emplace_back(id.ToString(), std::move(values));
      }

      if (pel_entry.consumer_name != "") {
        std::string original_consumer_key =
            internalKeyFromConsumerName(ns_key, metadata, group_name, pel_entry.consumer_name);
        std::string get_original_consumer_value;
        s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, original_consumer_key,
                          &get_original_consumer_value);
        if (!s.ok()) {
          return s;
        }
        StreamConsumerMetadata original_consumer_metadata =
            decodeStreamConsumerMetadataValue(get_original_consumer_value);
        original_consumer_metadata.pending_number -= 1;
        s = batch->Put(stream_cf_handle_, original_consumer_key,
                       encodeStreamConsumerMetadataValue(original_consumer_metadata));
        if (!s.ok()) return s;
      }

      pel_entry.consumer_name = consumer_name;
      consumer_metadata.pending_number += 1;
      if (options.with_time) {
        pel_entry.last_delivery_time_ms = options.last_delivery_time_ms;
      } else {
        pel_entry.last_delivery_time_ms = now - options.idle_time_ms;
      }

      if (pel_entry.last_delivery_time_ms < 0 || pel_entry.last_delivery_time_ms > now) {
        pel_entry.last_delivery_time_ms = now;
      }

      if (options.with_retry_count) {
        pel_entry.last_delivery_count = options.last_delivery_count;
      } else if (!options.just_id) {
        pel_entry.last_delivery_count += 1;
      }

      std::string pel_value = encodeStreamPelEntryValue(pel_entry);
      s = batch->Put(stream_cf_handle_, entry_key, pel_value);
      if (!s.ok()) return s;
    }
  }

  if (options.with_last_id && options.last_delivered_id > group_metadata.last_delivered_id) {
    group_metadata.last_delivered_id = options.last_delivered_id;
  }

  s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
  if (!s.ok()) return s;
  s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_metadata));
  if (!s.ok()) return s;
  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}