private async Task MessageDispatcher()

in src/DotPulsar/Internal/SubProducer.cs [107:149]


    private async Task MessageDispatcher(IProducerChannel channel, CancellationToken cancellationToken)
    {
        using var responseQueue = new AsyncQueue<Task<BaseCommand>>();
        var responseProcessorTask = Task.Run(async () => await ResponseProcessor(responseQueue, cancellationToken));

        _sendQueue.ResetCursor();

        while (!cancellationToken.IsCancellationRequested)
        {
            var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false);

            if (sendOp.CancellationToken.IsCancellationRequested)
            {
                _sendQueue.RemoveCurrentItem();
                continue;
            }

            var tcs = new TaskCompletionSource<BaseCommand>();
            _ = tcs.Task.ContinueWith(task =>
            {
                try
                {
                    responseQueue.Enqueue(task);
                }
                catch
                {
                    // Ignore
                }
            }, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously);

            // Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry.
            var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false);

            if (!success)
            {
                if (!cancellationToken.IsCancellationRequested)
                    _eventRegister.Register(new ChannelDisconnected(_correlationId));
                break;
            }
        }

        await responseProcessorTask.ConfigureAwait(false);
    }