in src/main/java/org/apache/sling/discovery/oak/cluster/OakClusterViewService.java [175:354]
private LocalClusterView asClusterView(DiscoveryLiteDescriptor descriptor, ResourceResolver resourceResolver) throws Exception {
if (descriptor == null) {
throw new IllegalArgumentException("descriptor must not be null");
}
if (resourceResolver==null) {
throw new IllegalArgumentException("resourceResolver must not be null");
}
logger.trace("asClusterView: start");
String clusterViewId = descriptor.getViewId();
if (clusterViewId == null || clusterViewId.length() == 0) {
logger.trace("asClusterView: no clusterId provided by discovery-lite descriptor - reading from repo.");
clusterViewId = readOrDefineClusterId(resourceResolver);
}
final long seqNum = descriptor.getSeqNum();
String localClusterSyncTokenId = /*descriptor.getViewId()+"_"+*/String.valueOf(seqNum);
if (!descriptor.isFinal()) {
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "descriptor is not yet final: "+descriptor);
}
LocalClusterView cluster = new LocalClusterView(clusterViewId, localClusterSyncTokenId);
int me = descriptor.getMyId();
int[] activeIds = descriptor.getActiveIds();
if (activeIds==null || activeIds.length==0) {
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "Descriptor contained no active ids: "+descriptor.getDescriptorStr());
}
final List<Integer> activeIdsList = Arrays.stream( activeIds ).boxed().collect( Collectors.toList() );
// step 1: sort activeIds by their leaderElectionId
// serves two purposes: pos[0] is then leader
// and the rest are properly sorted within the cluster
final ClusterReader reader = new ClusterReader(resourceResolver, config, idMapService, seenLocalInstances);
final Map<Integer,InstanceInfo> regularInstances = new HashMap<>();
final Set<Integer> partiallyStartedClusterNodeIds = new HashSet<>();
boolean suppressionEnabled = isSyncTokenEnabled() && isPartialSuppressionEnabled();
final InstanceReadResult myInstanceResult = reader.readInstance(me, false, -1);
final InstanceInfo myInstance = myInstanceResult.getInstanceInfo();
if (myInstance == null) {
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, myInstanceResult.getErrorMsg());
}
if (partialStartupSuppressingTimeout > 0
&& partialStartupSuppressingTimeout < System.currentTimeMillis()) {
// if partial suppression timeout is set and it has passed, then don't suppress
suppressionEnabled = false;
}
if (suppressionEnabled && myInstance.isSyncTokenNewerOrEqual(lowestSeqNum)) {
// that means that the local instance did store a synctoken ever
// so it did successfully once go through the syncTokenService
//
// as a result we can now start suppressing
} else {
// otherwise even the local instance hasn't done a full join ever,
// so we shouldn't do any suppression just yet
suppressionEnabled = false;
}
// categorize the activeIds into
// - partiallyStarted : added to partiallyStartedClusterNodeIds
// - fully started : added to fullyStartedInstances
for (Integer id : activeIdsList) {
if (id == me) {
regularInstances.put(me, myInstance);
continue;
}
InstanceReadResult readResult = reader.readInstance(id, suppressionEnabled, seqNum);
InstanceInfo instanceInfo = readResult.getInstanceInfo();
if (instanceInfo == null && !suppressionEnabled) {
// retry with a fresh idmap
idMapService.clearCache();
readResult = reader.readInstance(id, suppressionEnabled, seqNum);
instanceInfo = readResult.getInstanceInfo();
}
if (instanceInfo == null) {
if (suppressionEnabled) {
// then suppress this instance by not adding it to the resultingInstances map
partiallyStartedClusterNodeIds.add(id);
} else {
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, readResult.getErrorMsg());
}
} else {
regularInstances.put(id, instanceInfo);
}
}
if (!partiallyStartedClusterNodeIds.isEmpty()) {
logSilencer.infoOrDebug("asClusterView : partial instances : " + partiallyStartedClusterNodeIds);
activeIdsList.removeAll(partiallyStartedClusterNodeIds);
}
final List<Integer> sortedIds = leaderElectionSort(regularInstances);
if (sortedIds.size() != activeIdsList.size()) {
logger.error("asClusterView : list size mismatch : sorted = " + sortedIds.size()
+ ", active = " + activeIdsList.size() + " (partial = " + partiallyStartedClusterNodeIds.size() + ")");
}
boolean seenAllSyncTokens = true;
for(int i=0; i<sortedIds.size(); i++) {
int id = sortedIds.get(i);
boolean isLeader = i==0; // thx to sorting above [0] is leader indeed
boolean isOwn = id==me;
InstanceInfo in = regularInstances.get(id);
String slingId = in == null ? null : in.getSlingId();
if (slingId == null) {
idMapService.clearCache();
logger.info("asClusterView: cannot resolve oak-clusterNodeId {} to a slingId", id);
throw new Exception("Cannot resolve oak-clusterNodeId "+id+" to a slingId");
}
if (!in.isSyncTokenNewerOrEqual(seqNum)) {
logSilencer.infoOrDebug("Not seen syncToken (" + seqNum + ") of this instance yet : " + in);
seenAllSyncTokens = false;
}
Map<String, String> properties = readProperties(slingId, resourceResolver);
// create a new instance (adds itself to the cluster in the constructor)
new DefaultInstanceDescription(cluster, isLeader, isOwn, slingId, properties);
}
if (!partiallyStartedClusterNodeIds.isEmpty()) {
logSilencer.infoOrDebug("asClusterView: partially started instance nearby - clearing idmap cache");
idMapService.clearCache();
} else if (!seenAllSyncTokens) {
logSilencer.infoOrDebug("asClusterView: not seen all syncTokens yet - clearing idmap cache");
idMapService.clearCache();
}
if (!partiallyStartedClusterNodeIds.isEmpty()) {
logSilencer.infoOrDebug("asClusterView : adding as partially started slingIds: clusterNodeIds = " +
partiallyStartedClusterNodeIds);
cluster.setPartiallyStartedClusterNodeIds(partiallyStartedClusterNodeIds);
} else {
logSilencer.reset();
}
logger.trace("asClusterView: returning {}", cluster);
InstanceDescription local = cluster.getLocalInstance();
if (local == null) {
logger.info("getClusterView: the local instance ("+getSlingId()+") is currently not included in the existing established view! "
+ "This is normal at startup. At other times is pseudo-network-partitioning is an indicator for repository/network-delays or clocks-out-of-sync (SLING-3432). "
+ "(increasing the heartbeatTimeout can help as a workaround too) "
+ "The local instance will stay in TOPOLOGY_CHANGING or pre _INIT mode until a new vote was successful.");
throw new UndefinedClusterViewException(Reason.ISOLATED_FROM_TOPOLOGY,
"established view does not include local instance - isolated");
}
if (lowestSeqNum == -1) {
// this starts partialStartup suppression (if all other conditions met)
lowestSeqNum = seqNum;
}
// now remember those regularInstances in the seenLocalInstances map
// but before we do that, lets do some paranoia checks (useful for tests to fail)
for (InstanceInfo aSeenInstance : seenLocalInstances.values()) {
InstanceInfo r = regularInstances.get(aSeenInstance.getClusterNodeId());
if (r != null) {
continue;
}
final int clusterNodeId = aSeenInstance.getClusterNodeId();
if (!activeIdsList.contains(clusterNodeId)) {
// ok, then this one is no longer active, perfect.
continue;
}
logger.error("asClusterView : an instance is unexpectedly no longer part of the view : " + aSeenInstance);
}
this.seenLocalInstances = regularInstances;
if (partiallyStartedClusterNodeIds.isEmpty()) {
// success without suppressing -> reset the timeout
partialStartupSuppressingTimeout = 0;
} else {
// success with suppressing -> set the timeout (if not already set)
if (partialStartupSuppressingTimeout == 0) {
final long suppressionTimeoutSeconds = config.getSuppressionTimeoutSeconds();
if (suppressionTimeoutSeconds <= 0) {
partialStartupSuppressingTimeout = 0;
} else {
partialStartupSuppressingTimeout = System.currentTimeMillis()
+ (suppressionTimeoutSeconds * 1000);
}
}
}
return cluster;
}