in src/DotPulsar/Internal/SubProducer.cs [118:164]
private async Task MessageDispatcher(IProducerChannel channel, CancellationToken cancellationToken)
{
var responseQueue = new AsyncQueue<Task<BaseCommand>>();
var responseProcessorTask = ResponseProcessor(responseQueue, cancellationToken);
try
{
while (!cancellationToken.IsCancellationRequested)
{
var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
if (sendOp.CancellationToken.IsCancellationRequested)
{
_sendQueue.RemoveCurrentItem();
continue;
}
var tcs = new TaskCompletionSource<BaseCommand>();
_ = tcs.Task.ContinueWith(task =>
{
try
{
responseQueue.Enqueue(task);
}
catch
{
// Ignore
}
}, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously);
// Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry.
var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false);
if (success)
continue;
_eventRegister.Register(new ChannelDisconnected(_correlationId));
break;
}
await responseProcessorTask.ConfigureAwait(false);
}
finally
{
responseQueue.Dispose();
}
}