public async ValueTask Process()

in src/DotPulsar/Internal/MessageProcessor.cs [95:118]


    public async ValueTask Process(CancellationToken cancellationToken)
    {
        using var cts = new CancellationTokenSource();
        using var registration = Link(cts, cancellationToken);

        for (var i = 1; i < _maxDegreeOfParallelism; ++i)
        {
            StartNewProcessorTask(cts.Token);
        }

        while (true)
        {
            if (!cancellationToken.IsCancellationRequested)
                StartNewProcessorTask(cts.Token);

            if (_processorTasks.Count == 0)
                return;

            var completedTask = await Task.WhenAny(_processorTasks).ConfigureAwait(false);
            if (completedTask.IsFaulted)
                ExceptionDispatchInfo.Capture(completedTask.Exception!.InnerException!).Throw();
            _processorTasks.Remove(completedTask);
        }
    }