private void failoverOrReconnect()

in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java [584:816]


   private void failoverOrReconnect(final Object connectionID,
                                    final ActiveMQException me,
                                    String scaleDownTargetNodeID) {
      logger.debug("Failure captured on connectionID={}, performing failover or reconnection now", connectionID, me);

      for (ClientSessionInternal session : sessions) {
         SessionContext context = session.getSessionContext();
         if (context instanceof ActiveMQSessionContext) {
            ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
            if (sessionContext.isKilled()) {
               setReconnectAttempts(0);
            }
         }
      }

      Set<ClientSessionInternal> sessionsToClose = null;
      if (!clientProtocolManager.isAlive())
         return;
      Lock localFailoverLock = lockFailover();
      try {
         if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) {
            // We already failed over/reconnected - probably the first failure came in, all the connections were failed
            // over then an async connection exception or disconnect
            // came in for one of the already exitLoop connections, so we return true - we don't want to call the
            // listeners again

            return;
         }

         if (logger.isTraceEnabled()) {
            logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts={}", reconnectAttempts);
         }

         callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
         // We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
         callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);

         // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
         // There are either no threads executing in createSession, or one is blocking on a createSession
         // result.

         // Then interrupt the channel 1 that is blocking (could just interrupt them all)

         // Then release all channel 1 locks - this allows the createSession to exit the monitor

         // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and
         // returned all its connections to the connection manager (the code to return connections to connection manager
         // must be inside the lock

         // Then perform failover

         // Then release failoverLock

         // The other side of the bargain - during createSession:
         // The calling thread must get the failoverLock and get its' connections when this is
         // locked.
         // While this is still locked it must then get the channel1 lock
         // It can then release the failoverLock
         // It should catch ActiveMQException.INTERRUPTED in the call to channel.sendBlocking
         // It should then return its connections, with channel 1 lock still held
         // It can then release the channel 1 lock, and retry (which will cause locking on
         // failoverLock
         // until failover is complete

         if (reconnectAttempts != 0 || failoverAttempts != 0) {

            if (clientProtocolManager.cleanupBeforeFailover(me)) {

               // Now we absolutely know that no threads are executing in or blocked in
               // createSession,
               // and no
               // more will execute it until failover is complete

               // So.. do failover / reconnection

               RemotingConnection oldConnection = connection;

               connection = null;

               Connector localConnector = connector;
               if (localConnector != null) {
                  try {
                     localConnector.close();
                  } catch (Exception ignore) {
                     // no-op
                  }
               }

               cancelScheduledTasks();

               connector = null;

               HashSet<ClientSessionInternal> sessionsToFailover;
               synchronized (sessions) {
                  sessionsToFailover = new HashSet<>(sessions);
               }

               // Notify sessions before failover.
               for (ClientSessionInternal session : sessionsToFailover) {
                  session.preHandleFailover(connection);
               }


               // Try to reconnect to the current connector pair.
               // Before ARTEMIS-4251 ClientSessionFactoryImpl only tries to reconnect to the current connector pair.
               int reconnectRetries = 0;
               boolean sessionsReconnected = false;
               BiPredicate<Boolean, Integer> reconnectRetryPredicate =
                  (reconnected, retries) -> clientProtocolManager.isAlive() &&
                     !reconnected && (reconnectAttempts == -1 || retries < reconnectAttempts);
               while (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {

                  int remainingReconnectRetries = reconnectAttempts == -1 ? -1 : reconnectAttempts - reconnectRetries;
                  reconnectRetries += getConnectionWithRetry(remainingReconnectRetries, oldConnection);

                  if (connection != null) {
                     sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);

                     if (!sessionsReconnected) {
                        if (oldConnection != null) {
                           oldConnection.destroy();
                        }

                        oldConnection = connection;
                        connection = null;
                     }
                  }

                  reconnectRetries++;
                  if (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {
                     waitForRetry(retryInterval);
                  }
               }


               // Try to connect to other connector pairs.
               // After ARTEMIS-4251 ClientSessionFactoryImpl tries to connect to
               // other connector pairs when reconnection to the current connector pair fails.
               int connectorsCount = 0;
               int failoverRetries = 0;
               long failoverRetryInterval = retryInterval;
               Pair<TransportConfiguration, TransportConfiguration> connectorPair;
               BiPredicate<Boolean, Integer> failoverRetryPredicate =
                  (reconnected, retries) -> clientProtocolManager.isAlive() &&
                     !reconnected && (failoverAttempts == -1 || retries < failoverAttempts);
               while (failoverRetryPredicate.test(sessionsReconnected, failoverRetries)) {

                  connectorsCount++;
                  connectorPair = serverLocator.selectNextConnectorPair();

                  if (connectorPair != null) {
                     connectorConfig = connectorPair.getA();
                     currentConnectorConfig = connectorPair.getA();
                     if (connectorPair.getB() != null) {
                        backupConnectorConfig = connectorPair.getB();
                     }

                     getConnection();
                  }

                  if (connection != null) {
                     sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);

                     if (!sessionsReconnected) {
                        if (oldConnection != null) {
                           oldConnection.destroy();
                        }

                        oldConnection = connection;
                        connection = null;
                     }
                  }

                  if (connectorsCount >= serverLocator.getConnectorsSize()) {
                     connectorsCount = 0;
                     failoverRetries++;
                     if (failoverRetryPredicate.test(false, failoverRetries)) {
                        waitForRetry(failoverRetryInterval);
                        failoverRetryInterval = getNextRetryInterval(failoverRetryInterval);
                     }
                  }
               }


               // Notify sessions after failover.
               for (ClientSessionInternal session : sessionsToFailover) {
                  session.postHandleFailover(connection, sessionsReconnected);
               }

               if (oldConnection != null) {
                  oldConnection.destroy();
               }

               if (connection != null) {
                  callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
               }
            }
         } else {
            RemotingConnection connectionToDestory = connection;
            if (connectionToDestory != null) {
               connectionToDestory.destroy();
            }
            connection = null;
         }

         if (connection == null) {
            synchronized (sessions) {
               sessionsToClose = new HashSet<>(sessions);
            }
            callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
            callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);
         }
      } finally {
         localFailoverLock.unlock();
      }

      // This needs to be outside the failover lock to prevent deadlock
      if (connection != null) {
         callSessionFailureListeners(me, true, true);
      }
      if (sessionsToClose != null) {
         // If connection is null it means we didn't succeed in failing over or reconnecting
         // so we close all the sessions, so they will throw exceptions when attempted to be used

         for (ClientSessionInternal session : sessionsToClose) {
            try {
               session.cleanUp(true);
            } catch (Exception cause) {
               ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
            }
         }
      }
   }