in src/DotPulsar/Internal/SubProducer.cs [107:149]
private async Task MessageDispatcher(IProducerChannel channel, CancellationToken cancellationToken)
{
using var responseQueue = new AsyncQueue<Task<BaseCommand>>();
var responseProcessorTask = Task.Run(async () => await ResponseProcessor(responseQueue, cancellationToken));
_sendQueue.ResetCursor();
while (!cancellationToken.IsCancellationRequested)
{
var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
if (sendOp.CancellationToken.IsCancellationRequested)
{
_sendQueue.RemoveCurrentItem();
continue;
}
var tcs = new TaskCompletionSource<BaseCommand>();
_ = tcs.Task.ContinueWith(task =>
{
try
{
responseQueue.Enqueue(task);
}
catch
{
// Ignore
}
}, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously);
// Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry.
var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false);
if (!success)
{
if (!cancellationToken.IsCancellationRequested)
_eventRegister.Register(new ChannelDisconnected(_correlationId));
break;
}
}
await responseProcessorTask.ConfigureAwait(false);
}