in src/Transport/Failover/FailoverTransport.cs [1051:1289]
private bool DoConnect()
{
if (this.sslProtocol != null)
{
Tcp.SslContext.GetCurrent().SslProtocol = this.sslProtocol;
}
lock(reconnectMutex)
{
if (disposed || connectionFailure != null)
{
Monitor.PulseAll(reconnectMutex);
}
if ((connectedTransport.Value != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null)
{
return false;
}
else
{
List<Uri> connectList = ConnectList;
if(connectList.Count == 0)
{
Failure = new NMSConnectionException("No URIs available for connection.");
}
else
{
if (doRebalance)
{
if (connectedToPriority || CompareUris(connectList[0], connectedTransportURI))
{
// already connected to first in the list, no need to rebalance
doRebalance = false;
return false;
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Doing rebalance from: {0} to {1}",
connectedTransportURI, PrintableUriList(connectList));
}
try
{
ITransport current = this.connectedTransport.GetAndSet(null);
if (current != null)
{
DisposeTransport(current);
}
}
catch (Exception e)
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Caught an exception stopping existing " +
"transport for rebalance {0}", e.Message);
}
}
}
doRebalance = false;
}
ResetReconnectDelay();
ITransport transport = null;
Uri uri = null;
// If we have a backup already waiting lets try it.
lock(backupMutex)
{
if ((priorityBackup || backup) && backups.Count > 0)
{
List<BackupTransport> l = new List<BackupTransport>(backups);
if (randomize)
{
Shuffle(l);
}
BackupTransport bt = l[0];
l.RemoveAt(0);
backups.Remove(bt);
transport = bt.Transport;
uri = bt.Uri;
if (priorityBackup && priorityBackupAvailable)
{
ITransport old = this.connectedTransport.GetAndSet(null);
if (old != null)
{
DisposeTransport(old);
}
priorityBackupAvailable = false;
}
}
}
// Sleep for the reconnectDelay if there's no backup and we aren't trying
// for the first time, or we were disposed for some reason.
if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed)
{
lock(sleepMutex)
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Waiting {0} ms before attempting connection.", reconnectDelay);
}
try
{
Monitor.Wait(sleepMutex, reconnectDelay);
}
catch (ThreadInterruptedException)
{
}
}
}
IEnumerator<Uri> iter = connectList.GetEnumerator();
while ((transport != null || iter.MoveNext()) && (connectedTransport.Value == null && !disposed))
{
try
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Attempting {0}th connect to: {1}",
connectFailures, uri);
}
// We could be starting with a backup and if so we wait to grab a
// URI from the pool until next time around.
if (transport == null)
{
uri = AddExtraQueryOptions(iter.Current);
transport = TransportFactory.CompositeConnect(uri);
}
transport.CommandAsync = OnCommand;
transport.Exception = OnException;
transport.Start();
if (started && !firstConnection)
{
RestoreTransport(transport);
}
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Connection established");
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.Value = transport;
connectedToPriority = IsPriority(connectedTransportURI);
Monitor.PulseAll(reconnectMutex);
connectFailures = 0;
// Try to wait long enough for client to init the event callbacks.
listenerLatch.await(TimeSpan.FromSeconds(2));
if (Resumed != null)
{
Resumed(transport);
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("transport resumed by transport listener not set");
}
}
if (firstConnection)
{
firstConnection = false;
Tracer.Info("Successfully connected to " + uri);
}
else
{
Tracer.Info("Successfully reconnected to " + uri);
}
connected = true;
if (this.sslProtocol == null)
{
this.sslProtocol = Tcp.SslContext.GetCurrent().SslProtocol;
}
return false;
}
catch (Exception e)
{
failure = e;
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Connect fail to: " + uri + ", reason: " + e.Message);
}
if (transport != null)
{
try
{
transport.Stop();
transport = null;
}
catch (Exception ee)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Stop of failed transport: " + transport +
" failed with reason: " + ee.Message);
}
}
}
}
}
}
}
int reconnectLimit = CalculateReconnectAttemptLimit();
connectFailures++;
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit)
{
Tracer.ErrorFormat("Failed to connect to {0} after: {1} attempt(s)",
PrintableUriList(uris), connectFailures);
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been
// initialized for this instance.
listenerLatch.await(TimeSpan.FromSeconds(2));
PropagateFailureToExceptionListener(connectionFailure);
return false;
}
}
if(!disposed)
{
DoDelay();
}
return !disposed;
}