bool FailoverTransport::iterate()

in activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp [877:1055]


bool FailoverTransport::iterate() {

    Pointer<Exception> failure;

    synchronized(&this->impl->reconnectMutex) {

        if (this->impl->isClosedOrFailed()) {
            this->impl->reconnectMutex.notifyAll();
        }

        if (this->impl->isConnectionStateValid() || this->impl->isClosedOrFailed()) {
            return false;
        } else {

            Pointer<URIPool> connectList = this->impl->getConnectList();

            if (connectList->isEmpty() && !impl->backups->isEnabled()) {
                failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for reconnect."));
            } else {

                if (this->impl->doRebalance) {
                    if (this->impl->connectedToPrioirty || connectList->getPriorityURI().equals(*this->impl->connectedTransportURI)) {
                        // already connected to first in the list, no need to rebalance
                        this->impl->doRebalance = false;
                        return false;
                    } else {
                        // break any existing connect for rebalance.
                        this->impl->disconnect();
                    }

                    this->impl->doRebalance = false;
                }

                this->impl->resetReconnectDelay();

                LinkedList<URI> failures;
                Pointer<Transport> transport;
                URI uri;

                if (this->impl->backups->isEnabled()) {
                    Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup();
                    if (backupTransport != NULL) {
                        transport = backupTransport->getTransport();
                        uri = backupTransport->getUri();
                        if (this->impl->priorityBackup && this->impl->backups->isPriorityBackupAvailable()) {
                            // A priority connection is available and we aren't connected to
                            // any other priority transports so disconnect and use the backup.
                            this->impl->disconnect();
                        }
                    }
                }

                // Sleep for the reconnectDelay if there's no backup and we aren't trying
                // for the first time, or we were disposed for some reason.
                if (transport == NULL && !this->impl->firstConnection &&
                    (this->impl->reconnectDelay > 0) && !this->impl->closed) {
                    synchronized (&this->impl->sleepMutex) {
                        try {
                            this->impl->sleepMutex.wait(this->impl->reconnectDelay);
                        } catch (InterruptedException& e) {
                            Thread::currentThread()->interrupt();
                        }
                    }
                }

                while ((transport != NULL || !connectList->isEmpty()) && this->impl->connectedTransport == NULL && !this->impl->closed) {
                    try {
                        // We could be starting the loop with a backup already.
                        if (transport == NULL) {
                            try {
                                uri = connectList->getURI();
                            } catch (NoSuchElementException& ex) {
                                break;
                            }

                            transport = createTransport(uri);
                        }

                        transport->setTransportListener(this->impl->myTransportListener.get());
                        transport->start();

                        if (this->impl->started && !this->impl->firstConnection) {
                            restoreTransport(transport);
                        }

                        this->impl->reconnectDelay = this->impl->initialReconnectDelay;
                        this->impl->connectedTransportURI.reset(new URI(uri));
                        this->impl->connectedTransport = transport;
                        this->impl->reconnectMutex.notifyAll();
                        this->impl->connectFailures = 0;
                        this->impl->connected = true;

                        if (isPriorityBackup()) {
                            this->impl->connectedToPrioirty = connectList->getPriorityURI().equals(uri) ||
                                                              this->impl->priorityUris->contains(uri);
                        } else {
                            this->impl->connectedToPrioirty = false;
                        }

                        // Make sure on initial startup, that the transportListener
                        // has been initialized for this instance.
                        synchronized(&this->impl->listenerMutex) {
                            if (this->impl->transportListener == NULL) {
                                // if it isn't set after 2secs - it probably never will be
                                this->impl->listenerMutex.wait(2000);
                            }
                        }

                        if (this->impl->transportListener != NULL) {
                            this->impl->transportListener->transportResumed();
                        }

                        if (this->impl->firstConnection) {
                            this->impl->firstConnection = false;
                        }

                        // Return the failures to the pool, we will try again on the next iteration.
                        connectList->addURIs(failures);

                        this->impl->connected = true;
                        return false;

                    } catch (Exception& e) {
                        e.setMark(__FILE__, __LINE__);

                        if (transport != NULL) {
                            if (this->impl->disposedListener != NULL) {
                                transport->setTransportListener(this->impl->disposedListener.get());
                            }

                            try {
                                transport->stop();
                            } catch (...) {
                            }

                            // Hand off to the close task so it gets done in a different thread
                            // this prevents a deadlock from occurring if the Transport happens
                            // to call back through our onException method or locks in some other
                            // way.
                            this->impl->connected = false;
                            this->impl->closeTask->add(transport);
                            this->impl->taskRunner->wakeup();
                            transport.reset(NULL);
                        }

                        failures.add(uri);
                        failure.reset(e.clone());
                    }
                }

                // Return the failures to the pool, we will try again on the next iteration.
                connectList->addURIs(failures);
            }
        }

        int reconnectAttempts = this->impl->calculateReconnectAttemptLimit();

        if (reconnectAttempts >= 0 && ++this->impl->connectFailures >= reconnectAttempts) {
            this->impl->connectionFailure = failure;

            // Make sure on initial startup, that the transportListener has been initialized
            // for this instance.
            synchronized(&this->impl->listenerMutex) {
                if (this->impl->transportListener == NULL) {
                    this->impl->listenerMutex.wait(2000);
                }
            }

            this->impl->propagateFailureToExceptionListener();
            return false;
        }
    }

    if (!this->impl->closed) {
        this->impl->doDelay();
    }

    return !this->impl->closed;
}