in src/DotPulsar/Internal/MessageProcessor.cs [127:192]
private async ValueTask Processor(CancellationToken cancellationToken)
{
var messagesProcessed = 0;
var processInfo = new ProcessInfo();
var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgment && _maxDegreeOfParallelism > 1;
var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded;
while (!cancellationToken.IsCancellationRequested)
{
if (needToEnsureOrderedAcknowledgement)
{
processInfo = _processInfoPool.Get();
await _receiveLock.WaitAsync(cancellationToken).ConfigureAwait(false);
}
var message = await _consumer.Receive(cancellationToken).ConfigureAwait(false);
if (needToEnsureOrderedAcknowledgement)
{
processInfo.MessageId = message.MessageId;
processInfo.IsProcessed = false;
_processingQueue.Enqueue(processInfo);
_receiveLock.Release();
}
var activity = DotPulsarActivitySource.StartConsumerActivity(message, _operationName, _activityTags, _linkTraces);
if (activity is not null && activity.IsAllDataRequested)
{
activity.SetMessageId(message.MessageId);
activity.SetPayloadSize(message.Data.Length);
activity.SetStatus(ActivityStatusCode.Ok);
}
var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
try
{
await _processor(message, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
if (activity is not null && activity.IsAllDataRequested)
activity.AddException(exception);
}
if (startTimestamp != 0)
DotPulsarMeter.MessageProcessed(startTimestamp, _meterTags);
activity?.Dispose();
if (needToEnsureOrderedAcknowledgement)
{
await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
processInfo.IsProcessed = true;
await AcknowledgeProcessedMessages(cancellationToken).ConfigureAwait(false);
_acknowledgeLock.Release();
}
else
await _consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask)
return;
}
}