private async ValueTask Processor()

in src/DotPulsar/Internal/MessageProcessor.cs [127:192]


    private async ValueTask Processor(CancellationToken cancellationToken)
    {
        var messagesProcessed = 0;

        var processInfo = new ProcessInfo();

        var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgment && _maxDegreeOfParallelism > 1;
        var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded;

        while (!cancellationToken.IsCancellationRequested)
        {
            if (needToEnsureOrderedAcknowledgement)
            {
                processInfo = _processInfoPool.Get();
                await _receiveLock.WaitAsync(cancellationToken).ConfigureAwait(false);
            }

            var message = await _consumer.Receive(cancellationToken).ConfigureAwait(false);

            if (needToEnsureOrderedAcknowledgement)
            {
                processInfo.MessageId = message.MessageId;
                processInfo.IsProcessed = false;
                _processingQueue.Enqueue(processInfo);
                _receiveLock.Release();
            }

            var activity = DotPulsarActivitySource.StartConsumerActivity(message, _operationName, _activityTags, _linkTraces);
            if (activity is not null && activity.IsAllDataRequested)
            {
                activity.SetMessageId(message.MessageId);
                activity.SetPayloadSize(message.Data.Length);
                activity.SetStatus(ActivityStatusCode.Ok);
            }

            var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;

            try
            {
                await _processor(message, cancellationToken).ConfigureAwait(false);
            }
            catch (Exception exception)
            {
                if (activity is not null && activity.IsAllDataRequested)
                    activity.AddException(exception);
            }

            if (startTimestamp != 0)
                DotPulsarMeter.MessageProcessed(startTimestamp, _meterTags);

            activity?.Dispose();

            if (needToEnsureOrderedAcknowledgement)
            {
                await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
                processInfo.IsProcessed = true;
                await AcknowledgeProcessedMessages(cancellationToken).ConfigureAwait(false);
                _acknowledgeLock.Release();
            }
            else
                await _consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);

            if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask)
                return;
        }
    }