in activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp [877:1055]
bool FailoverTransport::iterate() {
Pointer<Exception> failure;
synchronized(&this->impl->reconnectMutex) {
if (this->impl->isClosedOrFailed()) {
this->impl->reconnectMutex.notifyAll();
}
if (this->impl->isConnectionStateValid() || this->impl->isClosedOrFailed()) {
return false;
} else {
Pointer<URIPool> connectList = this->impl->getConnectList();
if (connectList->isEmpty() && !impl->backups->isEnabled()) {
failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for reconnect."));
} else {
if (this->impl->doRebalance) {
if (this->impl->connectedToPrioirty || connectList->getPriorityURI().equals(*this->impl->connectedTransportURI)) {
// already connected to first in the list, no need to rebalance
this->impl->doRebalance = false;
return false;
} else {
// break any existing connect for rebalance.
this->impl->disconnect();
}
this->impl->doRebalance = false;
}
this->impl->resetReconnectDelay();
LinkedList<URI> failures;
Pointer<Transport> transport;
URI uri;
if (this->impl->backups->isEnabled()) {
Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup();
if (backupTransport != NULL) {
transport = backupTransport->getTransport();
uri = backupTransport->getUri();
if (this->impl->priorityBackup && this->impl->backups->isPriorityBackupAvailable()) {
// A priority connection is available and we aren't connected to
// any other priority transports so disconnect and use the backup.
this->impl->disconnect();
}
}
}
// Sleep for the reconnectDelay if there's no backup and we aren't trying
// for the first time, or we were disposed for some reason.
if (transport == NULL && !this->impl->firstConnection &&
(this->impl->reconnectDelay > 0) && !this->impl->closed) {
synchronized (&this->impl->sleepMutex) {
try {
this->impl->sleepMutex.wait(this->impl->reconnectDelay);
} catch (InterruptedException& e) {
Thread::currentThread()->interrupt();
}
}
}
while ((transport != NULL || !connectList->isEmpty()) && this->impl->connectedTransport == NULL && !this->impl->closed) {
try {
// We could be starting the loop with a backup already.
if (transport == NULL) {
try {
uri = connectList->getURI();
} catch (NoSuchElementException& ex) {
break;
}
transport = createTransport(uri);
}
transport->setTransportListener(this->impl->myTransportListener.get());
transport->start();
if (this->impl->started && !this->impl->firstConnection) {
restoreTransport(transport);
}
this->impl->reconnectDelay = this->impl->initialReconnectDelay;
this->impl->connectedTransportURI.reset(new URI(uri));
this->impl->connectedTransport = transport;
this->impl->reconnectMutex.notifyAll();
this->impl->connectFailures = 0;
this->impl->connected = true;
if (isPriorityBackup()) {
this->impl->connectedToPrioirty = connectList->getPriorityURI().equals(uri) ||
this->impl->priorityUris->contains(uri);
} else {
this->impl->connectedToPrioirty = false;
}
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized(&this->impl->listenerMutex) {
if (this->impl->transportListener == NULL) {
// if it isn't set after 2secs - it probably never will be
this->impl->listenerMutex.wait(2000);
}
}
if (this->impl->transportListener != NULL) {
this->impl->transportListener->transportResumed();
}
if (this->impl->firstConnection) {
this->impl->firstConnection = false;
}
// Return the failures to the pool, we will try again on the next iteration.
connectList->addURIs(failures);
this->impl->connected = true;
return false;
} catch (Exception& e) {
e.setMark(__FILE__, __LINE__);
if (transport != NULL) {
if (this->impl->disposedListener != NULL) {
transport->setTransportListener(this->impl->disposedListener.get());
}
try {
transport->stop();
} catch (...) {
}
// Hand off to the close task so it gets done in a different thread
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
this->impl->connected = false;
this->impl->closeTask->add(transport);
this->impl->taskRunner->wakeup();
transport.reset(NULL);
}
failures.add(uri);
failure.reset(e.clone());
}
}
// Return the failures to the pool, we will try again on the next iteration.
connectList->addURIs(failures);
}
}
int reconnectAttempts = this->impl->calculateReconnectAttemptLimit();
if (reconnectAttempts >= 0 && ++this->impl->connectFailures >= reconnectAttempts) {
this->impl->connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been initialized
// for this instance.
synchronized(&this->impl->listenerMutex) {
if (this->impl->transportListener == NULL) {
this->impl->listenerMutex.wait(2000);
}
}
this->impl->propagateFailureToExceptionListener();
return false;
}
}
if (!this->impl->closed) {
this->impl->doDelay();
}
return !this->impl->closed;
}