in src/types/redis_stream.cc [1430:1565]
rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stream_name, StreamRangeOptions &options,
std::vector<StreamEntry> *entries, std::string &group_name,
std::string &consumer_name, bool noack, bool latest) {
entries->clear();
if (options.with_count && options.count == 0) {
return rocksdb::Status::OK();
}
if (options.exclude_start && options.start.IsMaximum()) {
return rocksdb::Status::InvalidArgument("invalid start ID for the interval");
}
if (options.exclude_end && options.end.IsMinimum()) {
return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
}
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string get_group_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
if (!s.ok()) return s;
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
int created_number = 0;
s = createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, &created_number);
if (!s.ok()) {
return s;
}
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
}
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
StreamConsumerGroupMetadata consumergroup_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
auto now_ms = util::GetTimeStampMS();
consumer_metadata.last_attempted_interaction_ms = now_ms;
consumer_metadata.last_successful_interaction_ms = now_ms;
if (latest) {
options.start = consumergroup_metadata.last_delivered_id;
s = range(ctx, ns_key, metadata, options, entries);
if (!s.ok()) {
return s;
}
StreamEntryID maxid = {0, 0};
for (const auto &entry : *entries) {
StreamEntryID id;
Status st = ParseStreamEntryID(entry.key, &id);
if (!st.IsOK()) {
return rocksdb::Status::InvalidArgument(st.Msg());
}
if (id > maxid) {
maxid = id;
}
if (!noack) {
std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
StreamPelEntry pel_entry = {0, 0, consumer_name};
std::string pel_value = encodeStreamPelEntryValue(pel_entry);
s = batch->Put(stream_cf_handle_, pel_key, pel_value);
if (!s.ok()) return s;
consumergroup_metadata.entries_read += 1;
consumergroup_metadata.pending_number += 1;
consumer_metadata.pending_number += 1;
}
}
if (maxid > consumergroup_metadata.last_delivered_id) {
consumergroup_metadata.last_delivered_id = maxid;
}
} else {
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start);
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, StreamEntryID::Maximum());
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
uint64_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (pel_entry.consumer_name != consumer_name) continue;
std::string raw_value;
rocksdb::Status st = getEntryRawValue(ctx, ns_key, metadata, entry_id, &raw_value);
if (!st.ok() && !st.IsNotFound()) {
return st;
}
std::vector<std::string> values;
auto rv = DecodeRawStreamEntryValue(raw_value, &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
entries->emplace_back(entry_id.ToString(), std::move(values));
pel_entry.last_delivery_count += 1;
pel_entry.last_delivery_time_ms = now_ms;
s = batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(pel_entry));
if (!s.ok()) return s;
++count;
if (count >= options.count) break;
}
if (auto s = iter->status(); !s.ok()) {
return s;
}
}
s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
if (!s.ok()) return s;
s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}