in src/Connection.cs [993:1103]
internal async Task CheckConnectedAsync()
{
if(closed.Value)
{
throw new ConnectionClosedException();
}
if(!connected.Value)
{
DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
int waitCount = 1;
while(true)
{
var nmsLock = await connectedLock.TryLockAsync(1).Await(); // nmsLock != null would mean lock obtaining went ok
if(nmsLock != null)
{
using(nmsLock)
{
if(closed.Value || closing.Value)
{
break;
}
else if(!connected.Value)
{
if(!this.userSpecifiedClientID)
{
this.info.ClientId = this.clientIdGenerator.GenerateId();
}
try
{
if(null != transport)
{
// Make sure the transport is started.
if(!this.transport.IsStarted)
{
await this.transport.StartAsync().Await();
}
// Send the connection and see if an ack/nak is returned.
Response response = await transport.RequestAsync(this.info, this.RequestTimeout).Await();
if(!(response is ExceptionResponse))
{
connected.Value = true;
if (this.watchTopicAdviosires)
{
ConsumerId id = new ConsumerId(
new SessionId(info.ConnectionId, -1),
Interlocked.Increment(ref this.consumerIdCounter));
this.advisoryConsumer = await AdvisoryConsumer.CreateAsync(this, id).Await();
}
}
else
{
ExceptionResponse error = response as ExceptionResponse;
NMSException exception = CreateExceptionFromBrokerError(error.Exception);
if (exception is NMSSecurityException)
{
try
{
transport.Dispose();
}
catch
{
}
throw exception;
}
else if(exception is InvalidClientIDException)
{
// This is non-recoverable.
// Shutdown the transport connection, and re-create it, but don't start it.
// It will be started if the connection is re-attempted.
await this.transport.StopAsync().Await();
ITransport newTransport = TransportFactory.CreateTransport(this.brokerUri);
SetTransport(newTransport);
throw exception;
}
}
}
}
catch(BrokerException)
{
// We Swallow the generic version and throw ConnectionClosedException
}
catch(NMSException)
{
throw;
}
}
}
}
if(connected.Value || closed.Value || closing.Value
|| (DateTime.Now > timeoutTime && this.RequestTimeout != InfiniteTimeSpan))
{
break;
}
// Back off from being overly aggressive. Having too many threads
// aggressively trying to connect to a down broker pegs the CPU.
Thread.Sleep(5 * (waitCount++));
}
if(!connected.Value)
{
throw new ConnectionClosedException();
}
}
}