public MessageProcessor()

in src/DotPulsar/Internal/MessageProcessor.cs [46:80]


    public MessageProcessor(IConsumer<TMessage> consumer, Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, ProcessingOptions options)
    {
        const string operation = "process";
        _operationName = $"{consumer.Topic} {operation}";

        _activityTags = new KeyValuePair<string, object?>[]
        {
            new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
            new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
            new KeyValuePair<string, object?>("messaging.operation", operation),
            new KeyValuePair<string, object?>("messaging.system", "pulsar"),
            new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
            new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
        };

        _meterTags = new KeyValuePair<string, object?>[]
        {
            new KeyValuePair<string, object?>("topic", consumer.Topic),
            new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
        };

        _consumer = consumer;
        _processor = processor;
        _processorTasks = new LinkedList<Task>();
        _processingQueue = new ConcurrentQueue<ProcessInfo>();
        _receiveLock = new SemaphoreSlim(1, 1);
        _acknowledgeLock = new SemaphoreSlim(1, 1);
        _processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>());

        _linkTraces = options.LinkTraces;
        _ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
        _maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
        _maxMessagesPerTask = options.MaxMessagesPerTask;
        _taskScheduler = options.TaskScheduler;
    }