public DesiredBalance compute()

in server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java [96:467]


    public DesiredBalance compute(
        DesiredBalance previousDesiredBalance,
        DesiredBalanceInput desiredBalanceInput,
        Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
        Predicate<DesiredBalanceInput> isFresh
    ) {
        numComputeCallsSinceLastConverged += 1;
        if (logger.isTraceEnabled()) {
            logger.trace(
                "Recomputing desired balance for [{}]: {}, {}, {}, {}",
                desiredBalanceInput.index(),
                previousDesiredBalance,
                desiredBalanceInput.routingAllocation().routingNodes().toString(),
                desiredBalanceInput.routingAllocation().clusterInfo().toString(),
                desiredBalanceInput.routingAllocation().snapshotShardSizeInfo().toString()
            );
        } else {
            logger.debug("Recomputing desired balance for [{}]", desiredBalanceInput.index());
        }

        final var routingAllocation = desiredBalanceInput.routingAllocation().mutableCloneForSimulation();
        final var routingNodes = routingAllocation.routingNodes();
        final var knownNodeIds = routingNodes.getAllNodeIds();
        final var changes = routingAllocation.changes();
        final var ignoredShards = getIgnoredShardsWithDiscardedAllocationStatus(desiredBalanceInput.ignoredShards());
        final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation);
        DesiredBalance.ComputationFinishReason finishReason = DesiredBalance.ComputationFinishReason.CONVERGED;

        if (routingNodes.size() == 0) {
            return new DesiredBalance(desiredBalanceInput.index(), Map.of(), Map.of(), finishReason);
        }

        // we assume that all ongoing recoveries will complete
        for (final var routingNode : routingNodes) {
            for (final var shardRouting : routingNode) {
                if (shardRouting.initializing()) {
                    clusterInfoSimulator.simulateShardStarted(shardRouting);
                    routingNodes.startShard(shardRouting, changes, 0L);
                }
            }
        }

        // we are not responsible for allocating unassigned primaries of existing shards, and we're only responsible for allocating
        // unassigned replicas if the ReplicaShardAllocator gives up, so we must respect these ignored shards
        final var unassignedPrimaries = new HashSet<ShardId>();
        final var shardRoutings = new HashMap<ShardId, ShardRoutings>();
        for (final var primary : new boolean[] { true, false }) {
            final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
            for (final var iterator = unassigned.iterator(); iterator.hasNext();) {
                final var shardRouting = iterator.next();
                if (shardRouting.primary() == primary) {
                    var lastAllocatedNodeId = shardRouting.unassignedInfo().lastAllocatedNodeId();
                    if (knownNodeIds.contains(lastAllocatedNodeId)
                        || ignoredShards.contains(discardAllocationStatus(shardRouting)) == false) {
                        shardRoutings.computeIfAbsent(shardRouting.shardId(), ShardRoutings::new).unassigned().add(shardRouting);
                    } else {
                        iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, changes);
                        if (shardRouting.primary()) {
                            unassignedPrimaries.add(shardRouting.shardId());
                        }
                    }
                }
            }
        }

        for (final var assigned : routingNodes.getAssignedShards().entrySet()) {
            shardRoutings.computeIfAbsent(assigned.getKey(), ShardRoutings::new).assigned().addAll(assigned.getValue());
        }

        // we can assume that all possible shards will be allocated/relocated to one of their desired locations
        final var unassignedShardsToInitialize = new HashMap<ShardRouting, LinkedList<String>>();
        for (final var entry : shardRoutings.entrySet()) {
            final var shardId = entry.getKey();
            final var routings = entry.getValue();

            // treemap (keyed by node ID) so that we are consistent about the order of future relocations
            final var shardsToRelocate = new TreeMap<String, ShardRouting>();
            final var assignment = previousDesiredBalance.getAssignment(shardId);

            // treeset (ordered by node ID) so that we are consistent about the order of future relocations
            final var targetNodes = assignment != null ? new TreeSet<>(assignment.nodeIds()) : new TreeSet<String>();
            targetNodes.retainAll(knownNodeIds);

            // preserving last known shard location as a starting point to avoid unnecessary relocations
            for (ShardRouting shardRouting : routings.unassigned()) {
                var lastAllocatedNodeId = shardRouting.unassignedInfo().lastAllocatedNodeId();
                if (knownNodeIds.contains(lastAllocatedNodeId)) {
                    targetNodes.add(lastAllocatedNodeId);
                }
            }

            for (final var shardRouting : routings.assigned()) {
                assert shardRouting.started();
                if (targetNodes.remove(shardRouting.currentNodeId()) == false) {
                    final var previousShard = shardsToRelocate.put(shardRouting.currentNodeId(), shardRouting);
                    assert previousShard == null : "duplicate shards to relocate: " + shardRouting + " vs " + previousShard;
                }
            }

            final var targetNodesIterator = targetNodes.iterator();

            // Here existing shards are moved to desired locations before initializing unassigned shards because we prefer not to leave
            // immovable shards allocated to undesirable locations (e.g. a node that is shutting down or an allocation filter which was
            // only recently applied). In contrast, reconciliation prefers to initialize the unassigned shards first.
            relocateToDesiredLocation: for (final var shardRouting : shardsToRelocate.values()) {
                assert shardRouting.started();

                while (targetNodesIterator.hasNext()) {
                    final var targetNodeId = targetNodesIterator.next();
                    final var targetNode = routingNodes.node(targetNodeId);
                    if (targetNode != null
                        && routingAllocation.deciders()
                            .canAllocate(shardRouting, targetNode, routingAllocation)
                            .type() != Decision.Type.NO) {
                        final var shardToRelocate = routingNodes.relocateShard(shardRouting, targetNodeId, 0L, "computation", changes).v2();
                        clusterInfoSimulator.simulateShardStarted(shardToRelocate);
                        routingNodes.startShard(shardToRelocate, changes, 0L);
                        continue relocateToDesiredLocation;
                    }
                }
            }

            for (final var shardRouting : routings.unassigned()) {
                assert shardRouting.unassigned();
                if (targetNodesIterator.hasNext()) {
                    unassignedShardsToInitialize.computeIfAbsent(shardRouting, ignored -> new LinkedList<>())
                        .add(targetNodesIterator.next());
                } else {
                    break;
                }
            }
        }

        final var unassignedPrimaryIterator = routingNodes.unassigned().iterator();
        while (unassignedPrimaryIterator.hasNext()) {
            final var shardRouting = unassignedPrimaryIterator.next();
            if (shardRouting.primary()) {
                final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
                if (nodeIds != null && nodeIds.isEmpty() == false) {
                    final var nodeId = nodeIds.removeFirst();
                    final var routingNode = routingNodes.node(nodeId);
                    if (routingNode != null
                        && routingAllocation.deciders()
                            .canAllocate(shardRouting, routingNode, routingAllocation)
                            .type() != Decision.Type.NO) {
                        final var shardToInitialize = unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes);
                        clusterInfoSimulator.simulateShardStarted(shardToInitialize);
                        routingNodes.startShard(shardToInitialize, changes, 0L);
                    }
                }
            }
        }

        final var unassignedReplicaIterator = routingNodes.unassigned().iterator();
        while (unassignedReplicaIterator.hasNext()) {
            final var shardRouting = unassignedReplicaIterator.next();
            if (unassignedPrimaries.contains(shardRouting.shardId()) == false) {
                final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
                if (nodeIds != null && nodeIds.isEmpty() == false) {
                    final var nodeId = nodeIds.removeFirst();
                    final var routingNode = routingNodes.node(nodeId);
                    if (routingNode != null
                        && routingAllocation.deciders()
                            .canAllocate(shardRouting, routingNode, routingAllocation)
                            .type() != Decision.Type.NO) {
                        final var shardToInitialize = unassignedReplicaIterator.initialize(nodeId, null, 0L, changes);
                        clusterInfoSimulator.simulateShardStarted(shardToInitialize);
                        routingNodes.startShard(shardToInitialize, changes, 0L);
                    }
                }
            }
        }

        List<MoveAllocationCommand> commands;
        while ((commands = pendingDesiredBalanceMoves.poll()) != null) {
            for (MoveAllocationCommand command : commands) {
                try {
                    command.execute(routingAllocation, false);
                } catch (RuntimeException e) {
                    logger.debug(
                        () -> "move shard ["
                            + command.index()
                            + ":"
                            + command.shardId()
                            + "] command failed during applying it to the desired balance",
                        e
                    );
                }
            }
        }

        final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
        final long timeWarningInterval = progressLogInterval.millis();
        final long computationStartedTime = timeProvider.relativeTimeInMillis();
        long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval;

        int i = 0;
        boolean hasChanges = false;
        boolean assignedNewlyCreatedPrimaryShards = false;
        while (true) {
            if (hasChanges) {
                // Not the first iteration, so every remaining unassigned shard has been ignored, perhaps due to throttling. We must bring
                // them all back out of the ignored list to give the allocator another go...
                routingNodes.unassigned().resetIgnored();

                // ... but not if they're ignored because they're out of scope for allocation
                for (final var iterator = routingNodes.unassigned().iterator(); iterator.hasNext();) {
                    final var shardRouting = iterator.next();
                    if (ignoredShards.contains(discardAllocationStatus(shardRouting))) {
                        iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, changes);
                    }
                }
            }

            routingAllocation.setSimulatedClusterInfo(clusterInfoSimulator.getClusterInfo());
            logger.trace("running delegate allocator");
            delegateAllocator.allocate(routingAllocation);
            assert routingNodes.unassigned().isEmpty(); // any unassigned shards should now be ignored

            hasChanges = false;
            for (final var routingNode : routingNodes) {
                for (final var shardRouting : routingNode) {
                    if (shardRouting.initializing()) {
                        hasChanges = true;
                        if (shardRouting.primary()
                            && shardRouting.unassignedInfo() != null
                            && shardRouting.unassignedInfo().reason() == UnassignedInfo.Reason.INDEX_CREATED) {
                            // TODO: we could include more cases that would cause early publishing of desired balance in case of a long
                            // computation. e.g.:
                            // - unassigned search replicas in case the shard has no assigned shard replicas
                            // - other reasons for an unassigned shard such as NEW_INDEX_RESTORED
                            assignedNewlyCreatedPrimaryShards = true;
                        }
                        clusterInfoSimulator.simulateShardStarted(shardRouting);
                        routingNodes.startShard(shardRouting, changes, 0L);
                    }
                }
            }

            i += 1;
            numIterationsSinceLastConverged += 1;
            final int iterations = i;
            final long currentTime = timeProvider.relativeTimeInMillis();
            final boolean reportByTime = nextReportTime <= currentTime;
            final boolean reportByIterationCount = numIterationsSinceLastConverged % iterationCountReportInterval == 0;
            if (reportByTime || reportByIterationCount) {
                nextReportTime = currentTime + timeWarningInterval;
            }

            if (hasChanges == false && hasEnoughIterations(i)) {
                if (numComputeCallsSinceLastConverged > 1) {
                    logger.log(
                        convergenceLogMsgLevel,
                        () -> Strings.format(
                            """
                                Desired balance computation for [%d] converged after [%s] and [%d] iterations, \
                                resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
                            desiredBalanceInput.index(),
                            TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
                            numIterationsSinceLastConverged,
                            numComputeCallsSinceLastConverged,
                            iterations,
                            TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
                        )
                    );
                } else {
                    logger.log(
                        convergenceLogMsgLevel,
                        () -> Strings.format(
                            "Desired balance computation for [%d] converged after [%s] and [%d] iterations",
                            desiredBalanceInput.index(),
                            TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
                            numIterationsSinceLastConverged
                        )
                    );
                }
                numComputeCallsSinceLastConverged = 0;
                numIterationsSinceLastConverged = 0;
                lastConvergedTimeMillis = currentTime;
                break;
            }
            if (isFresh.test(desiredBalanceInput) == false) {
                // we run at least one iteration, but if another reroute happened meanwhile
                // then publish the interim state and restart the calculation
                logger.debug(
                    "Desired balance computation for [{}] interrupted after [{}] and [{}] iterations as newer cluster state received. "
                        + "Publishing intermediate desired balance and restarting computation",
                    desiredBalanceInput.index(),
                    TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
                    i
                );
                finishReason = DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT;
                break;
            }

            if (assignedNewlyCreatedPrimaryShards
                && currentTime - computationStartedTime >= maxBalanceComputationTimeDuringIndexCreationMillis) {
                logger.info(
                    "Desired balance computation for [{}] interrupted after [{}] and [{}] iterations "
                        + "in order to not delay assignment of newly created index shards for more than [{}]. "
                        + "Publishing intermediate desired balance and restarting computation",
                    desiredBalanceInput.index(),
                    TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
                    i,
                    TimeValue.timeValueMillis(maxBalanceComputationTimeDuringIndexCreationMillis).toString()
                );
                finishReason = DesiredBalance.ComputationFinishReason.STOP_EARLY;
                break;
            }

            final var logLevel = reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE;
            if (numComputeCallsSinceLastConverged > 1) {
                logger.log(
                    logLevel,
                    () -> Strings.format(
                        """
                            Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \
                            resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
                        desiredBalanceInput.index(),
                        TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
                        numIterationsSinceLastConverged,
                        numComputeCallsSinceLastConverged,
                        iterations,
                        TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
                    )
                );
            } else {
                logger.log(
                    logLevel,
                    () -> Strings.format(
                        "Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
                        desiredBalanceInput.index(),
                        TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
                        numIterationsSinceLastConverged
                    )
                );
            }

            if (reportByIterationCount || reportByTime) {
                lastNotConvergedLogMessageTimeMillis = currentTime;
            }
        }
        iterations.inc(i);

        final var assignments = collectShardAssignments(routingNodes);

        for (var shard : routingNodes.unassigned().ignored()) {
            var info = shard.unassignedInfo();
            assert info != null
                && (info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO
                    || info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.NO_ATTEMPT
                    || info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED) : "Unexpected stats in: " + info;

            if (hasChanges == false && info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED) {
                // Simulation could not progress due to missing information in any of the deciders.
                // Currently, this could happen if `HasFrozenCacheAllocationDecider` is still fetching the data.
                // Progress would be made after the followup reroute call.
                hasChanges = true;
            }

            var ignored = shard.unassignedInfo().lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO ? 0 : 1;
            assignments.compute(
                shard.shardId(),
                (key, oldValue) -> oldValue == null
                    ? new ShardAssignment(Set.of(), 1, 1, ignored)
                    : new ShardAssignment(oldValue.nodeIds(), oldValue.total() + 1, oldValue.unassigned() + 1, oldValue.ignored() + ignored)
            );
        }

        long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
        return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode(), finishReason);
    }