in src/DotPulsar/Internal/MessageProcessor.cs [44:87]
public MessageProcessor(
IConsumer<TMessage> consumer,
Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
ProcessingOptions options)
{
if (options.EnsureOrderedAcknowledgment &&
(consumer.SubscriptionType == SubscriptionType.Shared ||
consumer.SubscriptionType == SubscriptionType.KeyShared))
throw new ProcessingException("Ordered acknowledgment can not be ensuring with shared subscription types");
const string operation = "process";
_operationName = $"{consumer.Topic} {operation}";
_activityTags =
[
new("messaging.destination", consumer.Topic),
new("messaging.destination_kind", "topic"),
new("messaging.operation", operation),
new("messaging.system", "pulsar"),
new("messaging.url", consumer.ServiceUrl),
new("messaging.pulsar.subscription", consumer.SubscriptionName)
];
_meterTags =
[
new("topic", consumer.Topic),
new("subscription", consumer.SubscriptionName)
];
_consumer = consumer;
_processor = processor;
_processorTasks = new LinkedList<Task>();
_processingQueue = new ConcurrentQueue<ProcessInfo>();
_receiveLock = new SemaphoreSlim(1, 1);
_acknowledgeLock = new SemaphoreSlim(1, 1);
_processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>());
_linkTraces = options.LinkTraces;
_ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
_maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
_maxMessagesPerTask = options.MaxMessagesPerTask;
_shutdownGracePeriod = options.ShutdownGracePeriod;
_taskScheduler = options.TaskScheduler;
}