in src/DotPulsar/Internal/MessageProcessor.cs [46:80]
public MessageProcessor(IConsumer<TMessage> consumer, Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, ProcessingOptions options)
{
const string operation = "process";
_operationName = $"{consumer.Topic} {operation}";
_activityTags = new KeyValuePair<string, object?>[]
{
new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
new KeyValuePair<string, object?>("messaging.operation", operation),
new KeyValuePair<string, object?>("messaging.system", "pulsar"),
new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
};
_meterTags = new KeyValuePair<string, object?>[]
{
new KeyValuePair<string, object?>("topic", consumer.Topic),
new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
};
_consumer = consumer;
_processor = processor;
_processorTasks = new LinkedList<Task>();
_processingQueue = new ConcurrentQueue<ProcessInfo>();
_receiveLock = new SemaphoreSlim(1, 1);
_acknowledgeLock = new SemaphoreSlim(1, 1);
_processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>());
_linkTraces = options.LinkTraces;
_ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
_maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
_maxMessagesPerTask = options.MaxMessagesPerTask;
_taskScheduler = options.TaskScheduler;
}