final boolean doReconnect()

in activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java [934:1119]


    final boolean doReconnect() {
        Exception failure = null;
        synchronized (reconnectMutex) {
            List<URI> connectList = null;
            // First ensure we are up to date.
            doUpdateURIsFromDisk();

            if (disposed || connectionFailure != null) {
                reconnectMutex.notifyAll();
            }
            if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
                return false;
            } else {
                connectList = getConnectList();
                if (connectList.isEmpty()) {
                    failure = new IOException("No uris available to connect to.");
                } else {
                    if (doRebalance) {
                        if (connectedToPriority || (!doReconnect && compareURIs(connectList.get(0), connectedTransportURI))) {
                            // already connected to first in the list, no need to rebalance
                            doRebalance = false;
                            return false;
                        } else {
                            LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList);

                            try {
                                Transport transport = this.connectedTransport.getAndSet(null);
                                if (transport != null) {
                                    disposeTransport(transport);
                                }
                            } catch (Exception e) {
                                LOG.debug("Caught an exception stopping existing transport for rebalance", e);
                            }
                            doReconnect = false;
                        }
                        doRebalance = false;
                    }

                    resetReconnectDelay();

                    Transport transport = null;
                    URI uri = null;

                    // If we have a backup already waiting lets try it.
                    synchronized (backupMutex) {
                        if ((priorityBackup || backup) && !backups.isEmpty()) {
                            ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
                            if (randomize) {
                                Collections.shuffle(l);
                            }
                            BackupTransport bt = l.remove(0);
                            backups.remove(bt);
                            transport = bt.getTransport();
                            uri = bt.getUri();
                            processCommand(bt.getBrokerInfo());
                            if (priorityBackup && priorityBackupAvailable) {
                                Transport old = this.connectedTransport.getAndSet(null);
                                if (old != null) {
                                    disposeTransport(old);
                                }
                                priorityBackupAvailable = false;
                            }
                        }
                    }

                    // When there was no backup and we are reconnecting for the first time
                    // we honor the initialReconnectDelay before trying a new connection, after
                    // this normal reconnect delay happens following a failed attempt.
                    if (transport == null && !firstConnection && connectFailures == 0 && initialReconnectDelay > 0 && !disposed) {
                        // reconnectDelay will be equal to initialReconnectDelay since we are on
                        // the first connect attempt after we had a working connection, doDelay
                        // will apply updates to move to the next reconnectDelay value based on
                        // configuration.
                        doDelay();
                    }

                    Iterator<URI> iter = connectList.iterator();
                    while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {

                        try {
                            SslContext.setCurrentSslContext(brokerSslContext);

                            // We could be starting with a backup and if so we wait to grab a
                            // URI from the pool until next time around.
                            if (transport == null) {
                                uri = addExtraQueryOptions(iter.next());
                                transport = TransportFactory.compositeConnect(uri);
                            }

                            LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);

                            transport.setTransportListener(createTransportListener(transport));
                            transport.start();

                            if (started && !firstConnection) {
                                restoreTransport(transport);
                            }

                            LOG.debug("Connection established");

                            reconnectDelay = initialReconnectDelay;
                            connectedTransportURI = uri;
                            connectedTransport.set(transport);
                            connectedToPriority = isPriority(connectedTransportURI);
                            reconnectMutex.notifyAll();
                            connectFailures = 0;

                            // Make sure on initial startup, that the transportListener
                            // has been initialized for this instance.
                            synchronized (listenerMutex) {
                                if (transportListener == null) {
                                    try {
                                        // if it isn't set after 2secs - it probably never will be
                                        listenerMutex.wait(2000);
                                    } catch (InterruptedException ex) {
                                    }
                                }
                            }

                            if (transportListener != null) {
                                transportListener.transportResumed();
                            } else {
                                LOG.debug("transport resumed by transport listener not set");
                            }

                            if (firstConnection) {
                                firstConnection = false;
                                LOG.info("Successfully connected to {}", uri);
                            } else {
                                LOG.info("Successfully reconnected to {}", uri);
                            }

                            return false;
                        } catch (Exception e) {
                            failure = e;
                            LOG.debug("Connect fail to: {}, reason: {}", uri, e);
                            if (transport != null) {
                                try {
                                    transport.stop();
                                    transport = null;
                                } catch (Exception ee) {
                                    LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
                                }
                            }
                        } finally {
                            SslContext.setCurrentSslContext(null);
                        }
                    }
                }
            }

            int reconnectLimit = calculateReconnectAttemptLimit();

            connectFailures++;
            if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
                LOG.error("Failed to connect to {} after: {} attempt(s)", connectList, connectFailures);
                connectionFailure = failure;

                // Make sure on initial startup, that the transportListener has been
                // initialized for this instance.
                synchronized (listenerMutex) {
                    if (transportListener == null) {
                        try {
                            listenerMutex.wait(2000);
                        } catch (InterruptedException ex) {
                        }
                    }
                }

                propagateFailureToExceptionListener(connectionFailure);
                return false;
            }

            int warnInterval = getWarnAfterReconnectAttempts();
            if (warnInterval > 0 && (connectFailures == 1 || (connectFailures % warnInterval) == 0)) {
                LOG.warn("Failed to connect to {} after: {} attempt(s) with {}, continuing to retry.",
                         connectList, connectFailures, (failure == null ? "?" : failure.getLocalizedMessage()));
            }
        }

        if (!disposed) {
            doDelay();
        }

        return !disposed;
    }