in activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java [175:278]
private boolean doConnect() {
long closestReconnectDate = 0;
synchronized (reconnectMutex) {
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if (transports.size() == connectedCount || disposed || connectionFailure != null) {
return false;
} else {
if (transports.isEmpty()) {
// connectionFailure = new IOException("No uris available to
// connect to.");
} else {
// Try to connect them up.
Iterator<FanoutTransportHandler> iter = transports.iterator();
while (iter.hasNext() && !disposed) {
long now = System.currentTimeMillis();
FanoutTransportHandler fanoutHandler = iter.next();
if (fanoutHandler.transport != null) {
continue;
}
// Are we waiting a little to try to reconnect this one?
if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
closestReconnectDate = fanoutHandler.reconnectDate;
}
continue;
}
URI uri = fanoutHandler.uri;
try {
LOG.debug("Stopped: " + this);
LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
fanoutHandler.transport = t;
t.setTransportListener(fanoutHandler);
if (started) {
restoreTransport(fanoutHandler);
}
LOG.debug("Connection established");
fanoutHandler.reconnectDelay = initialReconnectDelay;
fanoutHandler.connectFailures = 0;
if (primary == null) {
primary = fanoutHandler;
}
connectedCount++;
} catch (Exception e) {
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
if (fanoutHandler.transport != null) {
ServiceSupport.dispose(fanoutHandler.transport);
fanoutHandler.transport = null;
}
if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
connectionFailure = e;
reconnectMutex.notifyAll();
return false;
} else {
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
fanoutHandler.reconnectDelay *= backOffMultiplier;
if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
fanoutHandler.reconnectDelay = maxReconnectDelay;
}
}
fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
closestReconnectDate = fanoutHandler.reconnectDate;
}
}
}
}
if (transports.size() == connectedCount || disposed) {
reconnectMutex.notifyAll();
return false;
}
}
}
}
try {
long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
if (reconnectDelay > 0) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
Thread.sleep(reconnectDelay);
}
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return true;
}