in src/DotPulsar/Internal/MessageProcessor.cs [194:217]
private async ValueTask AcknowledgeProcessedMessages(CancellationToken cancellationToken)
{
var messagesToAcknowledge = 0;
var messageId = MessageId.Earliest;
while (_processingQueue.TryPeek(out var processInfo))
{
if (!processInfo.IsProcessed)
break;
++messagesToAcknowledge;
if (_processingQueue.TryDequeue(out processInfo))
{
messageId = processInfo.MessageId;
_processInfoPool.Return(processInfo);
}
}
if (messagesToAcknowledge == 1)
await _consumer.Acknowledge(messageId, cancellationToken).ConfigureAwait(false);
else if (messagesToAcknowledge > 1)
await _consumer.AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false);
}