in artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java [106:335]
public void run() {
try {
logger.trace("SharedNothingBackupActivation..start");
synchronized (activeMQServer) {
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
}
// move all data away:
activeMQServer.getNodeManager().stop();
activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
activeMQServer.getNodeManager().start();
synchronized (this) {
if (closed) {
logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
return;
}
}
boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();
if (!activeMQServer.initialisePart1(scalingDown)) {
if (logger.isTraceEnabled()) {
logger.trace("could not initialize part1 {}", scalingDown);
}
return;
}
synchronized (this) {
if (closed) {
return;
}
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait(), attemptFailBack);
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
}
//use a Node Locator to connect to the cluster
NodeLocator nodeLocator;
if (activationParams.get(ActivationParams.REPLICATION_ENDPOINT) != null) {
TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT);
nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getPrimary(), member.getBackup()));
} else {
nodeLocator = replicaPolicy.getGroupName() == null ? new AnyNodeLocatorForReplication(backupQuorum, activeMQServer, replicaPolicy.getRetryReplicationWait()) : new NamedNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum, replicaPolicy.getRetryReplicationWait());
}
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);
logger.trace("Waiting on cluster connection");
clusterController.awaitConnectionToReplicationCluster();
logger.trace("Cluster Connected");
clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));
logger.debug("Starting backup manager");
activeMQServer.getBackupManager().start();
replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
EndpointConnector endpointConnector = new EndpointConnector();
logger.debug("Starting Backup Server");
ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(), activeMQServer.getNodeManager().getNodeId());
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
logger.trace("Setting server state as started");
SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
do {
if (closed) {
logger.debug("Activation is closed, so giving up");
return;
}
logger.trace("looking up the node through nodeLocator.locateNode()");
//locate the first primary server to try to replicate
nodeLocator.locateNode();
Pair<TransportConfiguration, TransportConfiguration> possiblePrimary = nodeLocator.getPrimaryConfiguration();
nodeID = nodeLocator.getNodeID();
logger.debug("Connecting towards a possible primary, connection information={}, nodeID={}", possiblePrimary, nodeID);
//in a normal (non failback) scenario if we couldn't find our primary server we should fail
if (!attemptFailBack) {
logger.debug("attemptFailback=false, nodeID={}", nodeID);
//this shouldn't happen
if (nodeID == null) {
logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");
throw new RuntimeException("Could not establish the connection");
}
activeMQServer.getNodeManager().setNodeID(nodeID);
}
if (possiblePrimary != null) {
clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possiblePrimary.getA());
if (clusterControl == null) {
clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possiblePrimary.getB());
}
} else {
clusterControl = null;
}
if (clusterControl == null) {
final long retryIntervalForReplicatedCluster = clusterController.getRetryIntervalForReplicatedCluster();
logger.trace("sleeping {} it should retry", retryIntervalForReplicatedCluster);
//its ok to retry here since we haven't started replication yet
//it may just be the server has gone since discovery
Thread.sleep(retryIntervalForReplicatedCluster);
signal = SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;
continue;
}
activeMQServer.getThreadPool().execute(endpointConnector);
/*
* Wait for a signal from the quorum manager. At this point if replication has been successful we can fail
* over or if there is an error trying to replicate (such as already replicating) we try the process again
* on the next primary server. All the action happens inside {@link BackupQuorum}
*/
signal = backupQuorum.waitForStatusChange();
logger.trace("Got a signal {} through backupQuorum.waitForStatusChange()", signal);
// replicationEndpoint will be holding lots of open files. Make sure they get closed/sync'ed.
ActiveMQServerImpl.stopComponent(replicationEndpoint);
// time to give up
if (!activeMQServer.isStarted() || signal == STOP) {
if (logger.isTraceEnabled()) {
logger.trace("giving up on the activation:: activemqServer.isStarted={} while signal = {}", activeMQServer.isStarted(), signal);
}
return;
} else if (signal == FAIL_OVER) {
// time to fail over
logger.trace("signal == FAIL_OVER, breaking the loop");
break;
} else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING || signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_RETRY) {
// something has gone badly run restart from scratch
logger.trace("Starting a new thread to stop the server!");
final SharedNothingBackupQuorum.BACKUP_ACTIVATION signalToStop = signal;
Thread startThread = new Thread(() -> {
try {
logger.trace("Calling activeMQServer.stop() as initialization failed");
if (activeMQServer.getState() != ActiveMQServer.SERVER_STATE.STOPPED &&
activeMQServer.getState() != ActiveMQServer.SERVER_STATE.STOPPING) {
if (signalToStop == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_RETRY) {
activeMQServer.stop(false);
logger.trace("The server was shutdown for a network isolation, we keep retrying");
activeMQServer.start();
} else {
activeMQServer.stop();
}
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(activeMQServer, e);
}
});
startThread.start();
return;
}
//ok, this primary is no good, let's reset and try again
//close this session factory, we're done with it
clusterControl.close();
backupQuorum.reset();
if (replicationEndpoint.getChannel() != null) {
replicationEndpoint.getChannel().close();
replicationEndpoint.setChannel(null);
}
}
while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
logger.trace("Activation loop finished, current signal = {}", signal);
activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum);
if (!isRemoteBackupUpToDate()) {
logger.debug("throwing exception for !isRemoteBackupUptoDate");
throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync();
}
logger.trace("@@@ setReplicaPolicy::{}", replicaPolicy);
replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy());
synchronized (activeMQServer) {
if (!activeMQServer.isStarted()) {
logger.trace("Server is stopped, giving up right before becoming active");
return;
}
ActiveMQServerLogger.LOGGER.becomingActive(activeMQServer);
logger.trace("stop backup");
activeMQServer.getNodeManager().stopBackup();
logger.trace("start store manager");
activeMQServer.getStorageManager().start();
logger.trace("activated");
activeMQServer.getBackupManager().activated();
if (scalingDown) {
logger.trace("Scalling down...");
activeMQServer.initialisePart2(true);
} else {
logger.trace("Setting up new activation");
activeMQServer.setActivation(new SharedNothingPrimaryActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
logger.trace("initialize part 2");
activeMQServer.initialisePart2(false);
if (activeMQServer.getIdentity() != null) {
ActiveMQServerLogger.LOGGER.serverIsActive(activeMQServer.getIdentity());
} else {
ActiveMQServerLogger.LOGGER.serverIsActive();
}
}
logger.trace("completeActivation at the end");
activeMQServer.completeActivation(true);
}
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.trace("{}, serverStarted={}", e.getMessage(), activeMQServer.isStarted(), e);
}
if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted())
// do not log these errors if the server is being stopped.
return;
ActiveMQServerLogger.LOGGER.initializationError(e);
}
}