in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java [620:703]
private void initializeNewConnection(final Provider provider) {
serializer.execute(() -> {
// Disallow other processing in the provider while we attempt to establish this
// provider as the new one for recovery, any incoming work stops until we finish
// and either recover or go back into a failed state.
lock.writeLock().lock();
try {
// In case a close is in play as we are reconnecting we close out the connected
// provider instance and return here to allow any pending close operations to
// finish now.
if (closingConnection.get() || closed.get() || failed.get()) {
try {
provider.close();
} catch(Throwable ignore) {
LOG.trace("Ingoring failure to close failed provider: {}", provider, ignore);
}
return;
}
FailoverProvider.this.provider = provider;
provider.setProviderListener(FailoverProvider.this);
connectedURI = provider.getRemoteURI();
if (reconnectControl.isRecoveryRequired()) {
LOG.debug("Signalling connection recovery: {}", provider);
// Stage 1: Allow listener to recover its resources
try {
listener.onConnectionRecovery(provider);
} finally {
// Stage 2: If the provider knows of others lets add them to the URI pool
// even if something failed here we can learn of new hosts so we
// always process the potential Open frame failover URI results.
processAlternates(provider.getAlternateURIs());
}
// Stage 3: Connection state recovered, get newly configured message factory.
FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
// Stage 4: Restart consumers, send pull commands, etc.
listener.onConnectionRecovered(provider);
// Stage 5: Let the client know that connection has restored.
listener.onConnectionRestored(provider.getRemoteURI());
// Last step: Send pending actions.
final List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
for (FailoverRequest request : pending) {
if (!request.isComplete()) {
request.run();
}
}
reconnectControl.connectionEstablished();
} else {
processAlternates(provider.getAlternateURIs());
// Last step: Send pending actions.
final List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
for (FailoverRequest request : pending) {
if (!request.isComplete()) {
request.run();
}
}
}
// Cancel timeout processing since we are connected again. We waited until
// now for the case where we are continually getting bounced from otherwise
// live servers, we want the timeout to remain scheduled in that case so that
// it doesn't keep getting rescheduled and never actually time anything out.
if (requestTimeoutTask != null) {
requestTimeoutTask.cancel(false);
requestTimeoutTask = null;
}
} catch (Throwable error) {
LOG.trace("Connection attempt:[{}] to: {} failed", reconnectControl.reconnectAttempts, provider.getRemoteURI());
handleProviderFailure(provider, ProviderExceptionSupport.createOrPassthroughFatal(error));
} finally {
lock.writeLock().unlock();
}
});
}