in activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java [934:1119]
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
List<URI> connectList = null;
// First ensure we are up to date.
doUpdateURIsFromDisk();
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
return false;
} else {
connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (doRebalance) {
if (connectedToPriority || (!doReconnect && compareURIs(connectList.get(0), connectedTransportURI))) {
// already connected to first in the list, no need to rebalance
doRebalance = false;
return false;
} else {
LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList);
try {
Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) {
disposeTransport(transport);
}
} catch (Exception e) {
LOG.debug("Caught an exception stopping existing transport for rebalance", e);
}
doReconnect = false;
}
doRebalance = false;
}
resetReconnectDelay();
Transport transport = null;
URI uri = null;
// If we have a backup already waiting lets try it.
synchronized (backupMutex) {
if ((priorityBackup || backup) && !backups.isEmpty()) {
ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
if (randomize) {
Collections.shuffle(l);
}
BackupTransport bt = l.remove(0);
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
processCommand(bt.getBrokerInfo());
if (priorityBackup && priorityBackupAvailable) {
Transport old = this.connectedTransport.getAndSet(null);
if (old != null) {
disposeTransport(old);
}
priorityBackupAvailable = false;
}
}
}
// When there was no backup and we are reconnecting for the first time
// we honor the initialReconnectDelay before trying a new connection, after
// this normal reconnect delay happens following a failed attempt.
if (transport == null && !firstConnection && connectFailures == 0 && initialReconnectDelay > 0 && !disposed) {
// reconnectDelay will be equal to initialReconnectDelay since we are on
// the first connect attempt after we had a working connection, doDelay
// will apply updates to move to the next reconnectDelay value based on
// configuration.
doDelay();
}
Iterator<URI> iter = connectList.iterator();
while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
try {
SslContext.setCurrentSslContext(brokerSslContext);
// We could be starting with a backup and if so we wait to grab a
// URI from the pool until next time around.
if (transport == null) {
uri = addExtraQueryOptions(iter.next());
transport = TransportFactory.compositeConnect(uri);
}
LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);
transport.setTransportListener(createTransportListener(transport));
transport.start();
if (started && !firstConnection) {
restoreTransport(transport);
}
LOG.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
connectedToPriority = isPriority(connectedTransportURI);
reconnectMutex.notifyAll();
connectFailures = 0;
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
// if it isn't set after 2secs - it probably never will be
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
if (transportListener != null) {
transportListener.transportResumed();
} else {
LOG.debug("transport resumed by transport listener not set");
}
if (firstConnection) {
firstConnection = false;
LOG.info("Successfully connected to {}", uri);
} else {
LOG.info("Successfully reconnected to {}", uri);
}
return false;
} catch (Exception e) {
failure = e;
LOG.debug("Connect fail to: {}, reason: {}", uri, e);
if (transport != null) {
try {
transport.stop();
transport = null;
} catch (Exception ee) {
LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
}
}
} finally {
SslContext.setCurrentSslContext(null);
}
}
}
}
int reconnectLimit = calculateReconnectAttemptLimit();
connectFailures++;
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
LOG.error("Failed to connect to {} after: {} attempt(s)", connectList, connectFailures);
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been
// initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
propagateFailureToExceptionListener(connectionFailure);
return false;
}
int warnInterval = getWarnAfterReconnectAttempts();
if (warnInterval > 0 && (connectFailures == 1 || (connectFailures % warnInterval) == 0)) {
LOG.warn("Failed to connect to {} after: {} attempt(s) with {}, continuing to retry.",
connectList, connectFailures, (failure == null ? "?" : failure.getLocalizedMessage()));
}
}
if (!disposed) {
doDelay();
}
return !disposed;
}