in src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java [365:497]
protected void issueClusterLocalHeartbeat() {
if (logger.isDebugEnabled()) {
logger.debug("issueClusterLocalHeartbeat: storing cluster-local heartbeat to repository for "+slingId);
}
ResourceResolver resourceResolver = null;
final String myClusterNodePath = getLocalClusterNodePath();
final Calendar currentTime = Calendar.getInstance();
try {
resourceResolver = getResourceResolver();
if (resourceResolver == null) {
logger.error("issueClusterLocalHeartbeat: no resourceresolver available!");
return;
}
final Resource resource = ResourceHelper.getOrCreateResource(
resourceResolver, myClusterNodePath);
final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);
if (firstHeartbeatWritten!=-1 && lastHeartbeatWritten!=null) {
// SLING-2892: additional paranoia check
// after the first heartbeat, check if there's someone else using
// the same sling.id in this cluster
final long timeSinceFirstHeartbeat =
System.currentTimeMillis() - firstHeartbeatWritten;
if (timeSinceFirstHeartbeat > 2*config.getHeartbeatInterval()) {
// but wait at least 2 heartbeat intervals to handle the situation
// where a bundle is refreshed, and startup cases.
final Calendar lastHeartbeat = resourceMap.get(PROPERTY_ID_LAST_HEARTBEAT, Calendar.class);
if (lastHeartbeat!=null) {
// if there is a heartbeat value, check if it is what I've written
// the last time
if (!lastHeartbeatWritten.getTime().equals(lastHeartbeat.getTime())) {
// then we've likely hit the situation where there is another
// sling instance accessing the same repository (ie in the same cluster)
// using the same sling.id - hence writing to the same
// resource
invalidateCurrentEstablishedView();
discoveryServiceImpl.handleTopologyChanging();
logger.error("issueClusterLocalHeartbeat: SLING-2892: Detected unexpected, concurrent update of: "+
myClusterNodePath+" 'lastHeartbeat'. If not done manually, " +
"this likely indicates that there is more than 1 instance running in this cluster" +
" with the same sling.id. My sling.id is "+slingId+"." +
" Check for sling.id.file in your installation of all instances in this cluster " +
"to verify this! Duplicate sling.ids are not allowed within a cluster!");
}
}
}
// SLING-2901 : robust paranoia check: on first heartbeat write, the
// 'runtimeId' is set as a property (ignoring any former value).
// If in subsequent calls the value of 'runtimeId' changes, then
// there is someone else around with the same slingId.
final String readRuntimeId = resourceMap.get(PROPERTY_ID_RUNTIME, String.class);
if ( readRuntimeId == null ) { // SLING-3977
// someone deleted the resource property
firstHeartbeatWritten = -1;
} else if (!runtimeId.equals(readRuntimeId)) {
invalidateCurrentEstablishedView();
discoveryServiceImpl.handleTopologyChanging();
final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
final String endpointsAsString = getEndpointsAsString();
final String readEndpoints = resourceMap.get(PROPERTY_ID_ENDPOINTS, String.class);
final String readSlingHomePath = resourceMap.get(PROPERTY_ID_SLING_HOME_PATH, String.class);
logger.error("issueClusterLocalHeartbeat: SLING-2901: Detected more than 1 instance running in this cluster " +
" with the same sling.id. " +
"My sling.id: "+slingId+", my runtimeId: " + runtimeId+", my endpoints: "+endpointsAsString+", my slingHomePath: "+slingHomePath+
", other runtimeId: "+readRuntimeId+", other endpoints: "+readEndpoints+", other slingHomePath:"+readSlingHomePath+
" Check for sling.id.file in your installation of all instances in this cluster " +
"to verify this! Duplicate sling.ids are not allowed within a cluster!");
logger.error("issueClusterLocalHeartbeat: sending TOPOLOGY_CHANGING before self-disabling.");
discoveryServiceImpl.forcedShutdown();
logger.error("issueClusterLocalHeartbeat: disabling discovery.impl");
activated = false;
if (context!=null) {
// disable all components
try {
context.getBundleContext().getBundle().stop();
} catch (BundleException e) {
logger.warn("issueClusterLocalHeartbeat: could not stop bundle: "+e, e);
// then disable all compnoents instead
context.disableComponent(null);
}
}
return;
}
}
resourceMap.put(PROPERTY_ID_LAST_HEARTBEAT, currentTime);
if (firstHeartbeatWritten==-1) {
resourceMap.put(PROPERTY_ID_RUNTIME, runtimeId);
// SLING-4765 : store more infos to be able to be more verbose on duplicate slingId/ghost detection
final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
resourceMap.put(PROPERTY_ID_SLING_HOME_PATH, slingHomePath);
final String endpointsAsString = getEndpointsAsString();
resourceMap.put(PROPERTY_ID_ENDPOINTS, endpointsAsString);
logger.info("issueClusterLocalHeartbeat: storing my runtimeId: {}, endpoints: {} and sling home path: {}",
new Object[]{runtimeId, endpointsAsString, slingHomePath});
}
if (resetLeaderElectionId || !resourceMap.containsKey("leaderElectionId")) {
// the new leaderElectionId might have been 'pre set' in the field 'newLeaderElectionId'
// if that's the case, use that one, otherwise calculate a new one now
final String newLeaderElectionId = this.newLeaderElectionId!=null ? this.newLeaderElectionId : newLeaderElectionId(resourceResolver);
this.newLeaderElectionId = null;
resourceMap.put("leaderElectionId", newLeaderElectionId);
resourceMap.put("leaderElectionIdCreatedAt", new Date());
logger.info("issueClusterLocalHeartbeat: set leaderElectionId to "+newLeaderElectionId+" (resetLeaderElectionId: "+resetLeaderElectionId+")");
if (votingHandler!=null) {
votingHandler.setLeaderElectionId(newLeaderElectionId);
}
resetLeaderElectionId = false;
}
logger.debug("issueClusterLocalHeartbeat: committing cluster-local heartbeat to repository for {}", slingId);
resourceResolver.commit();
logger.debug("issueClusterLocalHeartbeat: committed cluster-local heartbeat to repository for {}", slingId);
// SLING-2892: only in success case: remember the last heartbeat value written
lastHeartbeatWritten = currentTime;
// and set the first heartbeat written value - if it is not already set
if (firstHeartbeatWritten==-1) {
firstHeartbeatWritten = System.currentTimeMillis();
}
} catch (LoginException e) {
logger.error("issueHeartbeat: could not log in administratively: "
+ e, e);
} catch (PersistenceException e) {
logger.error("issueHeartbeat: Got a PersistenceException: "
+ myClusterNodePath + " " + e, e);
} finally {
if (resourceResolver != null) {
resourceResolver.close();
}
}
}