private async ValueTask AcknowledgeProcessedMessages()

in src/DotPulsar/Internal/MessageProcessor.cs [194:217]


    private async ValueTask AcknowledgeProcessedMessages(CancellationToken cancellationToken)
    {
        var messagesToAcknowledge = 0;
        var messageId = MessageId.Earliest;

        while (_processingQueue.TryPeek(out var processInfo))
        {
            if (!processInfo.IsProcessed)
                break;

            ++messagesToAcknowledge;

            if (_processingQueue.TryDequeue(out processInfo))
            {
                messageId = processInfo.MessageId;
                _processInfoPool.Return(processInfo);
            }
        }

        if (messagesToAcknowledge == 1)
            await _consumer.Acknowledge(messageId, cancellationToken).ConfigureAwait(false);
        else if (messagesToAcknowledge > 1)
            await _consumer.AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false);
    }