private LocalClusterView asClusterView()

in src/main/java/org/apache/sling/discovery/oak/cluster/OakClusterViewService.java [175:354]


    private LocalClusterView asClusterView(DiscoveryLiteDescriptor descriptor, ResourceResolver resourceResolver) throws Exception {
        if (descriptor == null) {
            throw new IllegalArgumentException("descriptor must not be null");
        }
        if (resourceResolver==null) {
            throw new IllegalArgumentException("resourceResolver must not be null");
        }
        logger.trace("asClusterView: start");
        String clusterViewId = descriptor.getViewId();
        if (clusterViewId == null || clusterViewId.length() == 0) {
            logger.trace("asClusterView: no clusterId provided by discovery-lite descriptor - reading from repo.");
            clusterViewId = readOrDefineClusterId(resourceResolver);
        }
        final long seqNum = descriptor.getSeqNum();
        String localClusterSyncTokenId = /*descriptor.getViewId()+"_"+*/String.valueOf(seqNum);
        if (!descriptor.isFinal()) {
            throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "descriptor is not yet final: "+descriptor);
        }
        LocalClusterView cluster = new LocalClusterView(clusterViewId, localClusterSyncTokenId);
        int me = descriptor.getMyId();
        int[] activeIds = descriptor.getActiveIds();
        if (activeIds==null || activeIds.length==0) {
            throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "Descriptor contained no active ids: "+descriptor.getDescriptorStr());
        }

        final List<Integer> activeIdsList = Arrays.stream( activeIds ).boxed().collect( Collectors.toList() );

        // step 1: sort activeIds by their leaderElectionId
        //   serves two purposes: pos[0] is then leader
        //   and the rest are properly sorted within the cluster

        final ClusterReader reader = new ClusterReader(resourceResolver, config, idMapService, seenLocalInstances);
        final Map<Integer,InstanceInfo> regularInstances = new HashMap<>();
        final Set<Integer> partiallyStartedClusterNodeIds = new HashSet<>();
        boolean suppressionEnabled = isSyncTokenEnabled() && isPartialSuppressionEnabled();

        final InstanceReadResult myInstanceResult = reader.readInstance(me, false, -1);
        final InstanceInfo myInstance = myInstanceResult.getInstanceInfo();
        if (myInstance == null) {
            throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, myInstanceResult.getErrorMsg());
        }

        if (partialStartupSuppressingTimeout > 0
                && partialStartupSuppressingTimeout < System.currentTimeMillis()) {
            // if partial suppression timeout is set and it has passed, then don't suppress
            suppressionEnabled = false;
        }

        if (suppressionEnabled && myInstance.isSyncTokenNewerOrEqual(lowestSeqNum)) {
            // that means that the local instance did store a synctoken ever
            // so it did successfully once go through the syncTokenService
            //
            // as a result we can now start suppressing
        } else {
            // otherwise even the local instance hasn't done a full join ever,
            // so we shouldn't do any suppression just yet
            suppressionEnabled = false;
        }

        // categorize the activeIds into
        // - partiallyStarted : added to partiallyStartedClusterNodeIds
        // - fully started    : added to fullyStartedInstances
        for (Integer id : activeIdsList) {
            if (id == me) {
                regularInstances.put(me, myInstance);
                continue;
            }
            InstanceReadResult readResult = reader.readInstance(id, suppressionEnabled, seqNum);
            InstanceInfo instanceInfo = readResult.getInstanceInfo();
            if (instanceInfo == null && !suppressionEnabled) {
                // retry with a fresh idmap
                idMapService.clearCache();
                readResult = reader.readInstance(id, suppressionEnabled, seqNum);
                instanceInfo = readResult.getInstanceInfo();
            }
            if (instanceInfo == null) {
                if (suppressionEnabled) {
                    // then suppress this instance by not adding it to the resultingInstances map
                    partiallyStartedClusterNodeIds.add(id);
                } else {
                    throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, readResult.getErrorMsg());
                }
            } else {
                regularInstances.put(id, instanceInfo);
            }
        }

        if (!partiallyStartedClusterNodeIds.isEmpty()) {
            logSilencer.infoOrDebug("asClusterView : partial instances : " + partiallyStartedClusterNodeIds);
            activeIdsList.removeAll(partiallyStartedClusterNodeIds);
        }

        final List<Integer> sortedIds = leaderElectionSort(regularInstances);

        if (sortedIds.size() != activeIdsList.size()) {
            logger.error("asClusterView : list size mismatch : sorted = " + sortedIds.size()
                + ", active = " + activeIdsList.size() + " (partial = " + partiallyStartedClusterNodeIds.size() + ")");
        }

        boolean seenAllSyncTokens = true;
        for(int i=0; i<sortedIds.size(); i++) {
            int id = sortedIds.get(i);
            boolean isLeader = i==0; // thx to sorting above [0] is leader indeed
            boolean isOwn = id==me;
            InstanceInfo in = regularInstances.get(id);
            String slingId = in == null ? null : in.getSlingId();
            if (slingId == null) {
                idMapService.clearCache();
                logger.info("asClusterView: cannot resolve oak-clusterNodeId {} to a slingId", id);
                throw new Exception("Cannot resolve oak-clusterNodeId "+id+" to a slingId");
            }
            if (!in.isSyncTokenNewerOrEqual(seqNum)) {
                logSilencer.infoOrDebug("Not seen syncToken (" + seqNum + ") of this instance yet : " + in);
                seenAllSyncTokens = false;
            }
            Map<String, String> properties = readProperties(slingId, resourceResolver);
            // create a new instance (adds itself to the cluster in the constructor)
            new DefaultInstanceDescription(cluster, isLeader, isOwn, slingId, properties);
        }
        if (!partiallyStartedClusterNodeIds.isEmpty()) {
            logSilencer.infoOrDebug("asClusterView: partially started instance nearby - clearing idmap cache");
            idMapService.clearCache();
        } else if (!seenAllSyncTokens) {
            logSilencer.infoOrDebug("asClusterView: not seen all syncTokens yet - clearing idmap cache");
            idMapService.clearCache();
        }
        if (!partiallyStartedClusterNodeIds.isEmpty()) {
            logSilencer.infoOrDebug("asClusterView : adding as partially started slingIds: clusterNodeIds = " +
                        partiallyStartedClusterNodeIds);
            cluster.setPartiallyStartedClusterNodeIds(partiallyStartedClusterNodeIds);
        } else {
            logSilencer.reset();
        }

        logger.trace("asClusterView: returning {}", cluster);
        InstanceDescription local = cluster.getLocalInstance();
        if (local == null) {
            logger.info("getClusterView: the local instance ("+getSlingId()+") is currently not included in the existing established view! "
                    + "This is normal at startup. At other times is pseudo-network-partitioning is an indicator for repository/network-delays or clocks-out-of-sync (SLING-3432). "
                    + "(increasing the heartbeatTimeout can help as a workaround too) "
                    + "The local instance will stay in TOPOLOGY_CHANGING or pre _INIT mode until a new vote was successful.");
            throw new UndefinedClusterViewException(Reason.ISOLATED_FROM_TOPOLOGY,
                    "established view does not include local instance - isolated");
        }
        if (lowestSeqNum == -1) {
            // this starts partialStartup suppression (if all other conditions met)
            lowestSeqNum = seqNum;
        }
        // now remember those regularInstances in the seenLocalInstances map
        // but before we do that, lets do some paranoia checks (useful for tests to fail)
        for (InstanceInfo aSeenInstance : seenLocalInstances.values()) {
            InstanceInfo r = regularInstances.get(aSeenInstance.getClusterNodeId());
            if (r != null) {
                continue;
            }
            final int clusterNodeId = aSeenInstance.getClusterNodeId();
            if (!activeIdsList.contains(clusterNodeId)) {
                // ok, then this one is no longer active, perfect.
                continue;
            }
            logger.error("asClusterView : an instance is unexpectedly no longer part of the view : " + aSeenInstance);
        }
        this.seenLocalInstances = regularInstances;
        if (partiallyStartedClusterNodeIds.isEmpty()) {
            // success without suppressing -> reset the timeout
            partialStartupSuppressingTimeout = 0;
        } else {
            // success with suppressing -> set the timeout (if not already set)
            if (partialStartupSuppressingTimeout == 0) {
                final long suppressionTimeoutSeconds = config.getSuppressionTimeoutSeconds();
                if (suppressionTimeoutSeconds <= 0) {
                    partialStartupSuppressingTimeout = 0;
                } else {
                    partialStartupSuppressingTimeout = System.currentTimeMillis()
                            + (suppressionTimeoutSeconds * 1000);
                }
            }
        }
        return cluster;
    }