in src/types/redis_stream.cc [527:684]
rocksdb::Status Stream::AutoClaim(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, const StreamAutoClaimOptions &options,
StreamAutoClaimResult *result) {
if (options.exclude_start && options.start_id.IsMaximum()) {
return rocksdb::Status::InvalidArgument("invalid start ID for the interval");
}
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
auto s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) { // not found will be caught by outside with no such key or consumer group
return s;
}
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
int created_number = 0;
s = createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, &created_number);
if (!s.ok()) {
return s;
}
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok()) {
return s;
}
}
StreamConsumerMetadata current_consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
std::map<std::string, uint64_t> claimed_consumer_entity_count;
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, StreamEntryID::Maximum());
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice lower_bound(prefix_key);
rocksdb::Slice upper_bound(end_key);
read_options.iterate_lower_bound = &lower_bound;
read_options.iterate_upper_bound = &upper_bound;
auto count = options.count;
uint64_t attempts = options.attempts_factors * count;
auto now_ms = util::GetTimeStampMS();
std::vector<StreamEntryID> deleted_entries;
std::vector<StreamEntry> pending_entries;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
uint64_t total_claimed_count = 0;
for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0; iter->Next()) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
if (options.exclude_start && entry_id == options.start_id) {
continue;
}
attempts--;
StreamPelEntry penl_entry = decodeStreamPelEntryValue(iter->value().ToString());
if ((now_ms - penl_entry.last_delivery_time_ms) < options.min_idle_time_ms) {
continue;
}
auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
std::string entry_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, entry_key, &entry_value);
if (!s.ok()) {
if (s.IsNotFound()) {
deleted_entries.push_back(entry_id);
s = batch->Delete(stream_cf_handle_, iter->key());
if (!s.ok()) return s;
--count;
continue;
}
return s;
}
StreamEntry entry(entry_id.ToString(), {});
if (!options.just_id) {
auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
if (!rv_status.OK()) {
return rocksdb::Status::InvalidArgument(rv_status.Msg());
}
}
pending_entries.emplace_back(std::move(entry));
--count;
if (penl_entry.consumer_name != consumer_name) {
++total_claimed_count;
claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
penl_entry.consumer_name = consumer_name;
penl_entry.last_delivery_time_ms = now_ms;
// Increment the delivery attempts counter unless JUSTID option provided
if (!options.just_id) {
penl_entry.last_delivery_count += 1;
}
s = batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(penl_entry));
if (!s.ok()) return s;
}
}
if (total_claimed_count > 0 && !pending_entries.empty()) {
current_consumer_metadata.pending_number += total_claimed_count;
current_consumer_metadata.last_attempted_interaction_ms = now_ms;
s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(current_consumer_metadata));
if (!s.ok()) return s;
for (const auto &[consumer, count] : claimed_consumer_entity_count) {
std::string tmp_consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer);
std::string tmp_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, tmp_consumer_key, &tmp_consumer_value);
if (!s.ok()) {
return s;
}
StreamConsumerMetadata tmp_consumer_metadata = decodeStreamConsumerMetadataValue(tmp_consumer_value);
tmp_consumer_metadata.pending_number -= count;
s = batch->Put(stream_cf_handle_, tmp_consumer_key, encodeStreamConsumerMetadataValue(tmp_consumer_metadata));
if (!s.ok()) return s;
}
}
bool has_next_entry = false;
for (; iter->Valid(); iter->Next()) {
has_next_entry = true;
break;
}
if (auto s = iter->status(); !s.ok()) {
return s;
}
if (has_next_entry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
result->next_claim_id = entry_id.ToString();
} else {
result->next_claim_id = StreamEntryID::Minimum().ToString();
}
result->entries = std::move(pending_entries);
result->deleted_ids.clear();
result->deleted_ids.reserve(deleted_entries.size());
std::transform(deleted_entries.cbegin(), deleted_entries.cend(), std::back_inserter(result->deleted_ids),
[](const StreamEntryID &id) { return id.ToString(); });
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}