in src/types/redis_stream.cc [396:525]
rocksdb::Status Stream::ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, const uint64_t min_idle_time_ms,
const std::vector<StreamEntryID> &entry_ids, const StreamClaimOptions &options,
StreamClaimResult *result) {
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string get_group_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
if (!s.ok()) return s;
StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
int created_number = 0;
s = createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, &created_number);
if (!s.ok()) {
return s;
}
group_metadata.consumer_number += created_number;
}
StreamConsumerMetadata consumer_metadata;
if (!s.IsNotFound()) {
consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
}
auto now = util::GetTimeStampMS();
consumer_metadata.last_attempted_interaction_ms = now;
consumer_metadata.last_successful_interaction_ms = now;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
for (const auto &id : entry_ids) {
std::string raw_value;
rocksdb::Status s = getEntryRawValue(ctx, ns_key, metadata, id, &raw_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) continue;
std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
std::string value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, entry_key, &value);
StreamPelEntry pel_entry;
if (!s.ok() && s.IsNotFound() && options.force) {
pel_entry = {0, 0, ""};
group_metadata.pending_number += 1;
}
if (s.ok()) {
pel_entry = decodeStreamPelEntryValue(value);
}
if (s.ok() || (s.IsNotFound() && options.force)) {
if (now - pel_entry.last_delivery_time_ms < min_idle_time_ms) continue;
std::vector<std::string> values;
if (options.just_id) {
result->ids.emplace_back(id.ToString());
} else {
auto rv = DecodeRawStreamEntryValue(raw_value, &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
result->entries.emplace_back(id.ToString(), std::move(values));
}
if (pel_entry.consumer_name != "") {
std::string original_consumer_key =
internalKeyFromConsumerName(ns_key, metadata, group_name, pel_entry.consumer_name);
std::string get_original_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, original_consumer_key,
&get_original_consumer_value);
if (!s.ok()) {
return s;
}
StreamConsumerMetadata original_consumer_metadata =
decodeStreamConsumerMetadataValue(get_original_consumer_value);
original_consumer_metadata.pending_number -= 1;
s = batch->Put(stream_cf_handle_, original_consumer_key,
encodeStreamConsumerMetadataValue(original_consumer_metadata));
if (!s.ok()) return s;
}
pel_entry.consumer_name = consumer_name;
consumer_metadata.pending_number += 1;
if (options.with_time) {
pel_entry.last_delivery_time_ms = options.last_delivery_time_ms;
} else {
pel_entry.last_delivery_time_ms = now - options.idle_time_ms;
}
if (pel_entry.last_delivery_time_ms < 0 || pel_entry.last_delivery_time_ms > now) {
pel_entry.last_delivery_time_ms = now;
}
if (options.with_retry_count) {
pel_entry.last_delivery_count = options.last_delivery_count;
} else if (!options.just_id) {
pel_entry.last_delivery_count += 1;
}
std::string pel_value = encodeStreamPelEntryValue(pel_entry);
s = batch->Put(stream_cf_handle_, entry_key, pel_value);
if (!s.ok()) return s;
}
}
if (options.with_last_id && options.last_delivered_id > group_metadata.last_delivered_id) {
group_metadata.last_delivered_id = options.last_delivered_id;
}
s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
if (!s.ok()) return s;
s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_metadata));
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}