in src/DotPulsar/Internal/SubProducer.cs [193:224]
public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
try
{
if (_dispatcherCts is not null)
{
if (!_dispatcherCts.IsCancellationRequested)
_dispatcherCts.Cancel();
_dispatcherCts.Dispose();
}
}
catch (Exception)
{
// Ignored
}
await _executor.TryExecuteOnce(() => _dispatcherTask ?? Task.CompletedTask, cancellationToken).ConfigureAwait(false);
try
{
var oldChannel = _channel;
await oldChannel.DisposeAsync().ConfigureAwait(false);
}
catch (Exception)
{
// Ignored
}
_channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
_dispatcherCts = new CancellationTokenSource();
_dispatcherTask = Task.Run(async () => await MessageDispatcher(_channel, _dispatcherCts.Token));
}