in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java [584:816]
private void failoverOrReconnect(final Object connectionID,
final ActiveMQException me,
String scaleDownTargetNodeID) {
logger.debug("Failure captured on connectionID={}, performing failover or reconnection now", connectionID, me);
for (ClientSessionInternal session : sessions) {
SessionContext context = session.getSessionContext();
if (context instanceof ActiveMQSessionContext) {
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
if (sessionContext.isKilled()) {
setReconnectAttempts(0);
}
}
}
Set<ClientSessionInternal> sessionsToClose = null;
if (!clientProtocolManager.isAlive())
return;
Lock localFailoverLock = lockFailover();
try {
if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) {
// We already failed over/reconnected - probably the first failure came in, all the connections were failed
// over then an async connection exception or disconnect
// came in for one of the already exitLoop connections, so we return true - we don't want to call the
// listeners again
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts={}", reconnectAttempts);
}
callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);
// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
// There are either no threads executing in createSession, or one is blocking on a createSession
// result.
// Then interrupt the channel 1 that is blocking (could just interrupt them all)
// Then release all channel 1 locks - this allows the createSession to exit the monitor
// Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and
// returned all its connections to the connection manager (the code to return connections to connection manager
// must be inside the lock
// Then perform failover
// Then release failoverLock
// The other side of the bargain - during createSession:
// The calling thread must get the failoverLock and get its' connections when this is
// locked.
// While this is still locked it must then get the channel1 lock
// It can then release the failoverLock
// It should catch ActiveMQException.INTERRUPTED in the call to channel.sendBlocking
// It should then return its connections, with channel 1 lock still held
// It can then release the channel 1 lock, and retry (which will cause locking on
// failoverLock
// until failover is complete
if (reconnectAttempts != 0 || failoverAttempts != 0) {
if (clientProtocolManager.cleanupBeforeFailover(me)) {
// Now we absolutely know that no threads are executing in or blocked in
// createSession,
// and no
// more will execute it until failover is complete
// So.. do failover / reconnection
RemotingConnection oldConnection = connection;
connection = null;
Connector localConnector = connector;
if (localConnector != null) {
try {
localConnector.close();
} catch (Exception ignore) {
// no-op
}
}
cancelScheduledTasks();
connector = null;
HashSet<ClientSessionInternal> sessionsToFailover;
synchronized (sessions) {
sessionsToFailover = new HashSet<>(sessions);
}
// Notify sessions before failover.
for (ClientSessionInternal session : sessionsToFailover) {
session.preHandleFailover(connection);
}
// Try to reconnect to the current connector pair.
// Before ARTEMIS-4251 ClientSessionFactoryImpl only tries to reconnect to the current connector pair.
int reconnectRetries = 0;
boolean sessionsReconnected = false;
BiPredicate<Boolean, Integer> reconnectRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
!reconnected && (reconnectAttempts == -1 || retries < reconnectAttempts);
while (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {
int remainingReconnectRetries = reconnectAttempts == -1 ? -1 : reconnectAttempts - reconnectRetries;
reconnectRetries += getConnectionWithRetry(remainingReconnectRetries, oldConnection);
if (connection != null) {
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);
if (!sessionsReconnected) {
if (oldConnection != null) {
oldConnection.destroy();
}
oldConnection = connection;
connection = null;
}
}
reconnectRetries++;
if (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {
waitForRetry(retryInterval);
}
}
// Try to connect to other connector pairs.
// After ARTEMIS-4251 ClientSessionFactoryImpl tries to connect to
// other connector pairs when reconnection to the current connector pair fails.
int connectorsCount = 0;
int failoverRetries = 0;
long failoverRetryInterval = retryInterval;
Pair<TransportConfiguration, TransportConfiguration> connectorPair;
BiPredicate<Boolean, Integer> failoverRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
!reconnected && (failoverAttempts == -1 || retries < failoverAttempts);
while (failoverRetryPredicate.test(sessionsReconnected, failoverRetries)) {
connectorsCount++;
connectorPair = serverLocator.selectNextConnectorPair();
if (connectorPair != null) {
connectorConfig = connectorPair.getA();
currentConnectorConfig = connectorPair.getA();
if (connectorPair.getB() != null) {
backupConnectorConfig = connectorPair.getB();
}
getConnection();
}
if (connection != null) {
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);
if (!sessionsReconnected) {
if (oldConnection != null) {
oldConnection.destroy();
}
oldConnection = connection;
connection = null;
}
}
if (connectorsCount >= serverLocator.getConnectorsSize()) {
connectorsCount = 0;
failoverRetries++;
if (failoverRetryPredicate.test(false, failoverRetries)) {
waitForRetry(failoverRetryInterval);
failoverRetryInterval = getNextRetryInterval(failoverRetryInterval);
}
}
}
// Notify sessions after failover.
for (ClientSessionInternal session : sessionsToFailover) {
session.postHandleFailover(connection, sessionsReconnected);
}
if (oldConnection != null) {
oldConnection.destroy();
}
if (connection != null) {
callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
}
}
} else {
RemotingConnection connectionToDestory = connection;
if (connectionToDestory != null) {
connectionToDestory.destroy();
}
connection = null;
}
if (connection == null) {
synchronized (sessions) {
sessionsToClose = new HashSet<>(sessions);
}
callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);
}
} finally {
localFailoverLock.unlock();
}
// This needs to be outside the failover lock to prevent deadlock
if (connection != null) {
callSessionFailureListeners(me, true, true);
}
if (sessionsToClose != null) {
// If connection is null it means we didn't succeed in failing over or reconnecting
// so we close all the sessions, so they will throw exceptions when attempted to be used
for (ClientSessionInternal session : sessionsToClose) {
try {
session.cleanUp(true);
} catch (Exception cause) {
ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
}
}
}
}