private async Task MessageDispatcher()

in src/DotPulsar/Internal/SubProducer.cs [118:164]


    private async Task MessageDispatcher(IProducerChannel channel, CancellationToken cancellationToken)
    {
        var responseQueue = new AsyncQueue<Task<BaseCommand>>();
        var responseProcessorTask = ResponseProcessor(responseQueue, cancellationToken);

        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false);

                if (sendOp.CancellationToken.IsCancellationRequested)
                {
                    _sendQueue.RemoveCurrentItem();
                    continue;
                }

                var tcs = new TaskCompletionSource<BaseCommand>();
                _ = tcs.Task.ContinueWith(task =>
                {
                    try
                    {
                        responseQueue.Enqueue(task);
                    }
                    catch
                    {
                        // Ignore
                    }
                }, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously);

                // Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry.
                var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false);

                if (success)
                    continue;

                _eventRegister.Register(new ChannelDisconnected(_correlationId));
                break;
            }

            await responseProcessorTask.ConfigureAwait(false);
        }
        finally
        {
            responseQueue.Dispose();
        }
    }