public MessageProcessor()

in src/DotPulsar/Internal/MessageProcessor.cs [44:87]


    public MessageProcessor(
        IConsumer<TMessage> consumer,
        Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
        ProcessingOptions options)
    {
        if (options.EnsureOrderedAcknowledgment &&
            (consumer.SubscriptionType == SubscriptionType.Shared ||
            consumer.SubscriptionType == SubscriptionType.KeyShared))
            throw new ProcessingException("Ordered acknowledgment can not be ensuring with shared subscription types");

        const string operation = "process";
        _operationName = $"{consumer.Topic} {operation}";

        _activityTags =
        [
            new("messaging.destination", consumer.Topic),
            new("messaging.destination_kind", "topic"),
            new("messaging.operation", operation),
            new("messaging.system", "pulsar"),
            new("messaging.url", consumer.ServiceUrl),
            new("messaging.pulsar.subscription", consumer.SubscriptionName)
        ];

        _meterTags =
        [
            new("topic", consumer.Topic),
            new("subscription", consumer.SubscriptionName)
        ];

        _consumer = consumer;
        _processor = processor;
        _processorTasks = new LinkedList<Task>();
        _processingQueue = new ConcurrentQueue<ProcessInfo>();
        _receiveLock = new SemaphoreSlim(1, 1);
        _acknowledgeLock = new SemaphoreSlim(1, 1);
        _processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>());

        _linkTraces = options.LinkTraces;
        _ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
        _maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
        _maxMessagesPerTask = options.MaxMessagesPerTask;
        _shutdownGracePeriod = options.ShutdownGracePeriod;
        _taskScheduler = options.TaskScheduler;
    }