in src/DotPulsar/Internal/ChannelManager.cs [43:72]
public bool HasChannels()
=> !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel channel)
{
var producerId = _producerChannels.Add(channel);
command.ProducerId = producerId;
var response = _requestResponseHandler.Outgoing(command);
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
_ = _producerChannels.Remove(producerId);
result.Result.Error.Throw();
}
if (response.Result.ProducerSuccess.ProducerReady)
{
channel.ProducerConnected(response.Result.ProducerSuccess.TopicEpoch);
}
else
{
channel.WaitingForExclusive();
HandleAdditionalProducerSuccess(command, channel.ProducerConnected);
}
return new ProducerResponse(producerId, response.Result.ProducerSuccess.ProducerName);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}