in src/DotPulsar/Internal/ChannelManager.cs [46:72]
public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel channel)
{
var producerId = AddProducerChannel(channel);
command.ProducerId = producerId;
var response = _requestResponseHandler.Outgoing(command);
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
_ = RemoveProducerChannel(producerId);
result.Result.Error.Throw();
}
if (response.Result.ProducerSuccess.ProducerReady)
{
channel.Connected();
}
else
{
channel.WaitingForExclusive();
HandleAdditionalProducerSuccess(command, channel.Connected);
}
return new ProducerResponse(producerId, response.Result.ProducerSuccess.ProducerName, response.Result.ProducerSuccess.TopicEpoch);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}