in server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java [96:467]
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
DesiredBalanceInput desiredBalanceInput,
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
Predicate<DesiredBalanceInput> isFresh
) {
numComputeCallsSinceLastConverged += 1;
if (logger.isTraceEnabled()) {
logger.trace(
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
desiredBalanceInput.index(),
previousDesiredBalance,
desiredBalanceInput.routingAllocation().routingNodes().toString(),
desiredBalanceInput.routingAllocation().clusterInfo().toString(),
desiredBalanceInput.routingAllocation().snapshotShardSizeInfo().toString()
);
} else {
logger.debug("Recomputing desired balance for [{}]", desiredBalanceInput.index());
}
final var routingAllocation = desiredBalanceInput.routingAllocation().mutableCloneForSimulation();
final var routingNodes = routingAllocation.routingNodes();
final var knownNodeIds = routingNodes.getAllNodeIds();
final var changes = routingAllocation.changes();
final var ignoredShards = getIgnoredShardsWithDiscardedAllocationStatus(desiredBalanceInput.ignoredShards());
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation);
DesiredBalance.ComputationFinishReason finishReason = DesiredBalance.ComputationFinishReason.CONVERGED;
if (routingNodes.size() == 0) {
return new DesiredBalance(desiredBalanceInput.index(), Map.of(), Map.of(), finishReason);
}
// we assume that all ongoing recoveries will complete
for (final var routingNode : routingNodes) {
for (final var shardRouting : routingNode) {
if (shardRouting.initializing()) {
clusterInfoSimulator.simulateShardStarted(shardRouting);
routingNodes.startShard(shardRouting, changes, 0L);
}
}
}
// we are not responsible for allocating unassigned primaries of existing shards, and we're only responsible for allocating
// unassigned replicas if the ReplicaShardAllocator gives up, so we must respect these ignored shards
final var unassignedPrimaries = new HashSet<ShardId>();
final var shardRoutings = new HashMap<ShardId, ShardRoutings>();
for (final var primary : new boolean[] { true, false }) {
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
for (final var iterator = unassigned.iterator(); iterator.hasNext();) {
final var shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
var lastAllocatedNodeId = shardRouting.unassignedInfo().lastAllocatedNodeId();
if (knownNodeIds.contains(lastAllocatedNodeId)
|| ignoredShards.contains(discardAllocationStatus(shardRouting)) == false) {
shardRoutings.computeIfAbsent(shardRouting.shardId(), ShardRoutings::new).unassigned().add(shardRouting);
} else {
iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, changes);
if (shardRouting.primary()) {
unassignedPrimaries.add(shardRouting.shardId());
}
}
}
}
}
for (final var assigned : routingNodes.getAssignedShards().entrySet()) {
shardRoutings.computeIfAbsent(assigned.getKey(), ShardRoutings::new).assigned().addAll(assigned.getValue());
}
// we can assume that all possible shards will be allocated/relocated to one of their desired locations
final var unassignedShardsToInitialize = new HashMap<ShardRouting, LinkedList<String>>();
for (final var entry : shardRoutings.entrySet()) {
final var shardId = entry.getKey();
final var routings = entry.getValue();
// treemap (keyed by node ID) so that we are consistent about the order of future relocations
final var shardsToRelocate = new TreeMap<String, ShardRouting>();
final var assignment = previousDesiredBalance.getAssignment(shardId);
// treeset (ordered by node ID) so that we are consistent about the order of future relocations
final var targetNodes = assignment != null ? new TreeSet<>(assignment.nodeIds()) : new TreeSet<String>();
targetNodes.retainAll(knownNodeIds);
// preserving last known shard location as a starting point to avoid unnecessary relocations
for (ShardRouting shardRouting : routings.unassigned()) {
var lastAllocatedNodeId = shardRouting.unassignedInfo().lastAllocatedNodeId();
if (knownNodeIds.contains(lastAllocatedNodeId)) {
targetNodes.add(lastAllocatedNodeId);
}
}
for (final var shardRouting : routings.assigned()) {
assert shardRouting.started();
if (targetNodes.remove(shardRouting.currentNodeId()) == false) {
final var previousShard = shardsToRelocate.put(shardRouting.currentNodeId(), shardRouting);
assert previousShard == null : "duplicate shards to relocate: " + shardRouting + " vs " + previousShard;
}
}
final var targetNodesIterator = targetNodes.iterator();
// Here existing shards are moved to desired locations before initializing unassigned shards because we prefer not to leave
// immovable shards allocated to undesirable locations (e.g. a node that is shutting down or an allocation filter which was
// only recently applied). In contrast, reconciliation prefers to initialize the unassigned shards first.
relocateToDesiredLocation: for (final var shardRouting : shardsToRelocate.values()) {
assert shardRouting.started();
while (targetNodesIterator.hasNext()) {
final var targetNodeId = targetNodesIterator.next();
final var targetNode = routingNodes.node(targetNodeId);
if (targetNode != null
&& routingAllocation.deciders()
.canAllocate(shardRouting, targetNode, routingAllocation)
.type() != Decision.Type.NO) {
final var shardToRelocate = routingNodes.relocateShard(shardRouting, targetNodeId, 0L, "computation", changes).v2();
clusterInfoSimulator.simulateShardStarted(shardToRelocate);
routingNodes.startShard(shardToRelocate, changes, 0L);
continue relocateToDesiredLocation;
}
}
}
for (final var shardRouting : routings.unassigned()) {
assert shardRouting.unassigned();
if (targetNodesIterator.hasNext()) {
unassignedShardsToInitialize.computeIfAbsent(shardRouting, ignored -> new LinkedList<>())
.add(targetNodesIterator.next());
} else {
break;
}
}
}
final var unassignedPrimaryIterator = routingNodes.unassigned().iterator();
while (unassignedPrimaryIterator.hasNext()) {
final var shardRouting = unassignedPrimaryIterator.next();
if (shardRouting.primary()) {
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
if (nodeIds != null && nodeIds.isEmpty() == false) {
final var nodeId = nodeIds.removeFirst();
final var routingNode = routingNodes.node(nodeId);
if (routingNode != null
&& routingAllocation.deciders()
.canAllocate(shardRouting, routingNode, routingAllocation)
.type() != Decision.Type.NO) {
final var shardToInitialize = unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes);
clusterInfoSimulator.simulateShardStarted(shardToInitialize);
routingNodes.startShard(shardToInitialize, changes, 0L);
}
}
}
}
final var unassignedReplicaIterator = routingNodes.unassigned().iterator();
while (unassignedReplicaIterator.hasNext()) {
final var shardRouting = unassignedReplicaIterator.next();
if (unassignedPrimaries.contains(shardRouting.shardId()) == false) {
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
if (nodeIds != null && nodeIds.isEmpty() == false) {
final var nodeId = nodeIds.removeFirst();
final var routingNode = routingNodes.node(nodeId);
if (routingNode != null
&& routingAllocation.deciders()
.canAllocate(shardRouting, routingNode, routingAllocation)
.type() != Decision.Type.NO) {
final var shardToInitialize = unassignedReplicaIterator.initialize(nodeId, null, 0L, changes);
clusterInfoSimulator.simulateShardStarted(shardToInitialize);
routingNodes.startShard(shardToInitialize, changes, 0L);
}
}
}
}
List<MoveAllocationCommand> commands;
while ((commands = pendingDesiredBalanceMoves.poll()) != null) {
for (MoveAllocationCommand command : commands) {
try {
command.execute(routingAllocation, false);
} catch (RuntimeException e) {
logger.debug(
() -> "move shard ["
+ command.index()
+ ":"
+ command.shardId()
+ "] command failed during applying it to the desired balance",
e
);
}
}
}
final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
final long timeWarningInterval = progressLogInterval.millis();
final long computationStartedTime = timeProvider.relativeTimeInMillis();
long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval;
int i = 0;
boolean hasChanges = false;
boolean assignedNewlyCreatedPrimaryShards = false;
while (true) {
if (hasChanges) {
// Not the first iteration, so every remaining unassigned shard has been ignored, perhaps due to throttling. We must bring
// them all back out of the ignored list to give the allocator another go...
routingNodes.unassigned().resetIgnored();
// ... but not if they're ignored because they're out of scope for allocation
for (final var iterator = routingNodes.unassigned().iterator(); iterator.hasNext();) {
final var shardRouting = iterator.next();
if (ignoredShards.contains(discardAllocationStatus(shardRouting))) {
iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, changes);
}
}
}
routingAllocation.setSimulatedClusterInfo(clusterInfoSimulator.getClusterInfo());
logger.trace("running delegate allocator");
delegateAllocator.allocate(routingAllocation);
assert routingNodes.unassigned().isEmpty(); // any unassigned shards should now be ignored
hasChanges = false;
for (final var routingNode : routingNodes) {
for (final var shardRouting : routingNode) {
if (shardRouting.initializing()) {
hasChanges = true;
if (shardRouting.primary()
&& shardRouting.unassignedInfo() != null
&& shardRouting.unassignedInfo().reason() == UnassignedInfo.Reason.INDEX_CREATED) {
// TODO: we could include more cases that would cause early publishing of desired balance in case of a long
// computation. e.g.:
// - unassigned search replicas in case the shard has no assigned shard replicas
// - other reasons for an unassigned shard such as NEW_INDEX_RESTORED
assignedNewlyCreatedPrimaryShards = true;
}
clusterInfoSimulator.simulateShardStarted(shardRouting);
routingNodes.startShard(shardRouting, changes, 0L);
}
}
}
i += 1;
numIterationsSinceLastConverged += 1;
final int iterations = i;
final long currentTime = timeProvider.relativeTimeInMillis();
final boolean reportByTime = nextReportTime <= currentTime;
final boolean reportByIterationCount = numIterationsSinceLastConverged % iterationCountReportInterval == 0;
if (reportByTime || reportByIterationCount) {
nextReportTime = currentTime + timeWarningInterval;
}
if (hasChanges == false && hasEnoughIterations(i)) {
if (numComputeCallsSinceLastConverged > 1) {
logger.log(
convergenceLogMsgLevel,
() -> Strings.format(
"""
Desired balance computation for [%d] converged after [%s] and [%d] iterations, \
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged,
numComputeCallsSinceLastConverged,
iterations,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
)
);
} else {
logger.log(
convergenceLogMsgLevel,
() -> Strings.format(
"Desired balance computation for [%d] converged after [%s] and [%d] iterations",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged
)
);
}
numComputeCallsSinceLastConverged = 0;
numIterationsSinceLastConverged = 0;
lastConvergedTimeMillis = currentTime;
break;
}
if (isFresh.test(desiredBalanceInput) == false) {
// we run at least one iteration, but if another reroute happened meanwhile
// then publish the interim state and restart the calculation
logger.debug(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations as newer cluster state received. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i
);
finishReason = DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT;
break;
}
if (assignedNewlyCreatedPrimaryShards
&& currentTime - computationStartedTime >= maxBalanceComputationTimeDuringIndexCreationMillis) {
logger.info(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations "
+ "in order to not delay assignment of newly created index shards for more than [{}]. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i,
TimeValue.timeValueMillis(maxBalanceComputationTimeDuringIndexCreationMillis).toString()
);
finishReason = DesiredBalance.ComputationFinishReason.STOP_EARLY;
break;
}
final var logLevel = reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE;
if (numComputeCallsSinceLastConverged > 1) {
logger.log(
logLevel,
() -> Strings.format(
"""
Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged,
numComputeCallsSinceLastConverged,
iterations,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
)
);
} else {
logger.log(
logLevel,
() -> Strings.format(
"Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged
)
);
}
if (reportByIterationCount || reportByTime) {
lastNotConvergedLogMessageTimeMillis = currentTime;
}
}
iterations.inc(i);
final var assignments = collectShardAssignments(routingNodes);
for (var shard : routingNodes.unassigned().ignored()) {
var info = shard.unassignedInfo();
assert info != null
&& (info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO
|| info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.NO_ATTEMPT
|| info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED) : "Unexpected stats in: " + info;
if (hasChanges == false && info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED) {
// Simulation could not progress due to missing information in any of the deciders.
// Currently, this could happen if `HasFrozenCacheAllocationDecider` is still fetching the data.
// Progress would be made after the followup reroute call.
hasChanges = true;
}
var ignored = shard.unassignedInfo().lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO ? 0 : 1;
assignments.compute(
shard.shardId(),
(key, oldValue) -> oldValue == null
? new ShardAssignment(Set.of(), 1, 1, ignored)
: new ShardAssignment(oldValue.nodeIds(), oldValue.total() + 1, oldValue.unassigned() + 1, oldValue.ignored() + ignored)
);
}
long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode(), finishReason);
}