rocksdb::Status Stream::AutoClaim()

in src/types/redis_stream.cc [527:684]


rocksdb::Status Stream::AutoClaim(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
                                  const std::string &consumer_name, const StreamAutoClaimOptions &options,
                                  StreamAutoClaimResult *result) {
  if (options.exclude_start && options.start_id.IsMaximum()) {
    return rocksdb::Status::InvalidArgument("invalid start ID for the interval");
  }

  std::string ns_key = AppendNamespacePrefix(stream_name);
  StreamMetadata metadata(false);

  auto s = GetMetadata(ctx, ns_key, &metadata);
  if (!s.ok()) {  // not found will be caught by outside with no such key or consumer group
    return s;
  }

  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
  std::string get_consumer_value;
  s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
  if (!s.ok() && !s.IsNotFound()) {
    return s;
  }
  if (s.IsNotFound()) {
    int created_number = 0;
    s = createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, &created_number);
    if (!s.ok()) {
      return s;
    }
    s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
    if (!s.ok()) {
      return s;
    }
  }

  StreamConsumerMetadata current_consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
  std::map<std::string, uint64_t> claimed_consumer_entity_count;
  std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
  std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, StreamEntryID::Maximum());

  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
  rocksdb::Slice lower_bound(prefix_key);
  rocksdb::Slice upper_bound(end_key);
  read_options.iterate_lower_bound = &lower_bound;
  read_options.iterate_upper_bound = &upper_bound;

  auto count = options.count;
  uint64_t attempts = options.attempts_factors * count;
  auto now_ms = util::GetTimeStampMS();
  std::vector<StreamEntryID> deleted_entries;
  std::vector<StreamEntry> pending_entries;

  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisStream);
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
  uint64_t total_claimed_count = 0;
  for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0; iter->Next()) {
    std::string tmp_group_name;
    StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);

    if (options.exclude_start && entry_id == options.start_id) {
      continue;
    }

    attempts--;

    StreamPelEntry penl_entry = decodeStreamPelEntryValue(iter->value().ToString());
    if ((now_ms - penl_entry.last_delivery_time_ms) < options.min_idle_time_ms) {
      continue;
    }

    auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
    std::string entry_value;
    s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, entry_key, &entry_value);
    if (!s.ok()) {
      if (s.IsNotFound()) {
        deleted_entries.push_back(entry_id);
        s = batch->Delete(stream_cf_handle_, iter->key());
        if (!s.ok()) return s;
        --count;
        continue;
      }
      return s;
    }

    StreamEntry entry(entry_id.ToString(), {});
    if (!options.just_id) {
      auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
      if (!rv_status.OK()) {
        return rocksdb::Status::InvalidArgument(rv_status.Msg());
      }
    }

    pending_entries.emplace_back(std::move(entry));
    --count;

    if (penl_entry.consumer_name != consumer_name) {
      ++total_claimed_count;
      claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
      penl_entry.consumer_name = consumer_name;
      penl_entry.last_delivery_time_ms = now_ms;
      // Increment the delivery attempts counter unless JUSTID option provided
      if (!options.just_id) {
        penl_entry.last_delivery_count += 1;
      }
      s = batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(penl_entry));
      if (!s.ok()) return s;
    }
  }

  if (total_claimed_count > 0 && !pending_entries.empty()) {
    current_consumer_metadata.pending_number += total_claimed_count;
    current_consumer_metadata.last_attempted_interaction_ms = now_ms;

    s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(current_consumer_metadata));
    if (!s.ok()) return s;

    for (const auto &[consumer, count] : claimed_consumer_entity_count) {
      std::string tmp_consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer);
      std::string tmp_consumer_value;
      s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, tmp_consumer_key, &tmp_consumer_value);
      if (!s.ok()) {
        return s;
      }
      StreamConsumerMetadata tmp_consumer_metadata = decodeStreamConsumerMetadataValue(tmp_consumer_value);
      tmp_consumer_metadata.pending_number -= count;
      s = batch->Put(stream_cf_handle_, tmp_consumer_key, encodeStreamConsumerMetadataValue(tmp_consumer_metadata));
      if (!s.ok()) return s;
    }
  }

  bool has_next_entry = false;
  for (; iter->Valid(); iter->Next()) {
    has_next_entry = true;
    break;
  }

  if (auto s = iter->status(); !s.ok()) {
    return s;
  }

  if (has_next_entry) {
    std::string tmp_group_name;
    StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
    result->next_claim_id = entry_id.ToString();
  } else {
    result->next_claim_id = StreamEntryID::Minimum().ToString();
  }

  result->entries = std::move(pending_entries);
  result->deleted_ids.clear();
  result->deleted_ids.reserve(deleted_entries.size());
  std::transform(deleted_entries.cbegin(), deleted_entries.cend(), std::back_inserter(result->deleted_ids),
                 [](const StreamEntryID &id) { return id.ToString(); });

  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}