in src/Connection.cs [1110:1184]
protected async Task OnCommandAsync(ITransport commandTransport, Command command)
{
if(command.IsMessageDispatch)
{
WaitForTransportInterruptionProcessingToComplete();
await DispatchMessageAsync((MessageDispatch) command).Await();
}
else if(command.IsKeepAliveInfo)
{
OnKeepAliveCommand(commandTransport, (KeepAliveInfo) command);
}
else if(command.IsWireFormatInfo)
{
this.brokerWireFormatInfo = (WireFormatInfo) command;
}
else if(command.IsBrokerInfo)
{
this.brokerInfo = (BrokerInfo) command;
this.brokerInfoReceived.countDown();
}
else if(command.IsShutdownInfo)
{
// Only terminate the connection if the transport we use is not fault
// tolerant otherwise we let the transport deal with the broker closing
// our connection and deal with IOException if it is sent to use.
if(!closing.Value && !closed.Value && this.transport != null && !this.transport.IsFaultTolerant)
{
OnException(new NMSException("Broker closed this connection via Shutdown command."));
}
}
else if(command.IsProducerAck)
{
ProducerAck ack = (ProducerAck) command as ProducerAck;
if(ack.ProducerId != null)
{
MessageProducer producer = producers[ack.ProducerId] as MessageProducer;
if(producer != null)
{
if(Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Connection[{0}]: Received a new ProducerAck -> ",
this.ConnectionId, ack);
}
producer.OnProducerAck(ack);
}
}
}
else if(command.IsConnectionError)
{
if(!closing.Value && !closed.Value)
{
ConnectionError connectionError = (ConnectionError) command;
BrokerError brokerError = connectionError.Exception;
string message = "Broker connection error.";
string cause = "";
if(null != brokerError)
{
message = brokerError.Message;
if(null != brokerError.Cause)
{
cause = brokerError.Cause.Message;
}
}
Tracer.ErrorFormat("Connection[{0}]: ConnectionError: {1} : {2}", this.ConnectionId, message, cause);
OnAsyncException(CreateExceptionFromBrokerError(brokerError));
}
}
else
{
Tracer.ErrorFormat("Connection[{0}]: Unknown command: {1}", this.ConnectionId, command);
}
}