public void run()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java [106:341]


   public void run() {
      try {
         logger.trace("SharedNothingBackupActivation..start");
         synchronized (activeMQServer) {
            activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
         }
         // move all data away:
         activeMQServer.getNodeManager().stop();
         activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
         activeMQServer.getNodeManager().start();
         synchronized (this) {
            if (closed) {
               logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
               return;
            }
         }

         boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();

         if (!activeMQServer.initialisePart1(scalingDown)) {
            if (logger.isTraceEnabled()) {
               logger.trace("could not initialize part1 {}", scalingDown);
            }
            return;
         }

         synchronized (this) {
            if (closed) {
               return;
            }
            backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait(), attemptFailBack);
            activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
            activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
         }

         //use a Node Locator to connect to the cluster
         LiveNodeLocator nodeLocator;
         if (activationParams.get(ActivationParams.REPLICATION_ENDPOINT) != null) {
            TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT);
            nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup()));
         } else {
            nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer, replicaPolicy.getRetryReplicationWait()) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum, replicaPolicy.getRetryReplicationWait());
         }
         ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
         clusterController.addClusterTopologyListenerForReplication(nodeLocator);

         logger.trace("Waiting on cluster connection");
         clusterController.awaitConnectionToReplicationCluster();

         logger.trace("Cluster Connected");

         clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));

         logger.debug("Starting backup manager");
         activeMQServer.getBackupManager().start();

         replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
         EndpointConnector endpointConnector = new EndpointConnector();

         logger.debug("Starting Backup Server");
         ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(), activeMQServer.getNodeManager().getNodeId());
         activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);

         logger.trace("Setting server state as started");

         SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
         do {
            if (closed) {
               logger.debug("Activation is closed, so giving up");
               return;
            }

            logger.trace("looking up the node through nodeLocator.locateNode()");

            //locate the first live server to try to replicate
            nodeLocator.locateNode();
            Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();
            nodeID = nodeLocator.getNodeID();
            logger.debug("Connecting towards a possible live, connection information={}, nodeID={}", possibleLive, nodeID);

            //in a normal (non failback) scenario if we couldn't find our live server we should fail
            if (!attemptFailBack) {
               logger.debug("attemptFailback=false, nodeID={}", nodeID);

               //this shouldn't happen
               if (nodeID == null) {
                  logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");
                  throw new RuntimeException("Could not establish the connection");
               }
               activeMQServer.getNodeManager().setNodeID(nodeID);
            }

            if (possibleLive != null) {
               clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getA());
               if (clusterControl == null) {
                  clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getB());
               }
            } else {
               clusterControl = null;
            }

            if (clusterControl == null) {
               final long retryIntervalForReplicatedCluster = clusterController.getRetryIntervalForReplicatedCluster();
               logger.trace("sleeping {} it should retry", retryIntervalForReplicatedCluster);

               //its ok to retry here since we haven't started replication yet
               //it may just be the server has gone since discovery
               Thread.sleep(retryIntervalForReplicatedCluster);
               signal = SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;
               continue;
            }

            activeMQServer.getThreadPool().execute(endpointConnector);
            /**
             * Wait for a signal from the the quorum manager, at this point if replication has been successful we can
             * fail over or if there is an error trying to replicate (such as already replicating) we try the
             * process again on the next live server.  All the action happens inside {@link BackupQuorum}
             */
            signal = backupQuorum.waitForStatusChange();

            logger.trace("Got a signal {} through backupQuorum.waitForStatusChange()", signal);

            /**
             * replicationEndpoint will be holding lots of open files. Make sure they get
             * closed/sync'ed.
             */
            ActiveMQServerImpl.stopComponent(replicationEndpoint);
            // time to give up
            if (!activeMQServer.isStarted() || signal == STOP) {
               if (logger.isTraceEnabled()) {
                  logger.trace("giving up on the activation:: activemqServer.isStarted={} while signal = {}", activeMQServer.isStarted(), signal);
               }
               return;
            } else if (signal == FAIL_OVER) {
               // time to fail over
               logger.trace("signal == FAIL_OVER, breaking the loop");
               break;
            } else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING || signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_RETRY) {
               // something has gone badly run restart from scratch
               logger.trace("Starting a new thread to stop the server!");

               final SharedNothingBackupQuorum.BACKUP_ACTIVATION signalToStop = signal;

               Thread startThread = new Thread(new Runnable() {
                  @Override
                  public void run() {
                     try {
                        logger.trace("Calling activeMQServer.stop() as initialization failed");

                        if (activeMQServer.getState() != ActiveMQServer.SERVER_STATE.STOPPED &&
                            activeMQServer.getState() != ActiveMQServer.SERVER_STATE.STOPPING) {

                           if (signalToStop == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_RETRY) {
                              activeMQServer.stop(false);
                              logger.trace("The server was shutdown for a network isolation, we keep retrying");
                              activeMQServer.start();
                           } else {
                              activeMQServer.stop();
                           }
                        }
                     } catch (Exception e) {
                        ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(activeMQServer, e);
                     }
                  }
               });
               startThread.start();
               return;
            }
            //ok, this live is no good, let's reset and try again
            //close this session factory, we're done with it
            clusterControl.close();
            backupQuorum.reset();
            if (replicationEndpoint.getChannel() != null) {
               replicationEndpoint.getChannel().close();
               replicationEndpoint.setChannel(null);
            }
         }
         while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);

         logger.trace("Activation loop finished, current signal = {}", signal);

         activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum);

         if (!isRemoteBackupUpToDate()) {
            logger.debug("throwing exception for !isRemoteBackupUptoDate");
            throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync();
         }

         logger.trace("@@@ setReplicaPolicy::{}", replicaPolicy);

         replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
         activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy());

         synchronized (activeMQServer) {
            if (!activeMQServer.isStarted()) {
               logger.trace("Server is stopped, giving up right before becomingLive");
               return;
            }
            ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
            logger.trace("stop backup");
            activeMQServer.getNodeManager().stopBackup();
            logger.trace("start store manager");
            activeMQServer.getStorageManager().start();
            logger.trace("activated");
            activeMQServer.getBackupManager().activated();
            if (scalingDown) {
               logger.trace("Scalling down...");
               activeMQServer.initialisePart2(true);
            } else {
               logger.trace("Setting up new activation");
               activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
               logger.trace("initialize part 2");
               activeMQServer.initialisePart2(false);

               if (activeMQServer.getIdentity() != null) {
                  ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
               } else {
                  ActiveMQServerLogger.LOGGER.serverIsLive();
               }

            }

            logger.trace("completeActivation at the end");

            activeMQServer.completeActivation(true);
         }
      } catch (Exception e) {
         if (logger.isTraceEnabled()) {
            logger.trace("{}, serverStarted={}", e.getMessage(), activeMQServer.isStarted(), e);
         }
         if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted())
            // do not log these errors if the server is being stopped.
            return;
         ActiveMQServerLogger.LOGGER.initializationError(e);
      }
   }