in src/DotPulsar/Internal/MessageProcessor.cs [95:118]
public async ValueTask Process(CancellationToken cancellationToken)
{
using var cts = new CancellationTokenSource();
using var registration = Link(cts, cancellationToken);
for (var i = 1; i < _maxDegreeOfParallelism; ++i)
{
StartNewProcessorTask(cts.Token);
}
while (true)
{
if (!cancellationToken.IsCancellationRequested)
StartNewProcessorTask(cts.Token);
if (_processorTasks.Count == 0)
return;
var completedTask = await Task.WhenAny(_processorTasks).ConfigureAwait(false);
if (completedTask.IsFaulted)
ExceptionDispatchInfo.Capture(completedTask.Exception!.InnerException!).Throw();
_processorTasks.Remove(completedTask);
}
}