in server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java [139:438]
public void onNewInfo(ClusterInfo info) {
final ClusterState state = clusterStateSupplier.get();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
logger.debug("skipping monitor as the cluster state is not recovered yet");
return;
}
// TODO find a better way to limit concurrent updates (and potential associated reroutes) while allowing tests to ensure that
// all ClusterInfo updates are processed and never ignored
if (checkInProgress.compareAndSet(false, true) == false) {
logger.info("skipping monitor as a check is already in progress");
return;
}
if (diskThresholdSettings.isEnabled() == false) {
removeExistingIndexBlocks();
return;
} else {
// reset this for the next disable call.
cleanupUponDisableCalled.set(false);
}
final Map<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
if (usages == null) {
logger.trace("skipping monitor as no disk usage information is available");
lastNodes = Collections.emptySet();
checkFinished();
return;
}
logger.trace("processing new cluster info");
boolean reroute = false;
String explanation = "";
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
// Clean up nodes that have been removed from the cluster
final Set<String> nodes = new HashSet<>(usages.keySet());
cleanUpRemovedNodes(nodes, nodesOverLowThreshold);
cleanUpRemovedNodes(nodes, nodesOverHighThreshold);
cleanUpRemovedNodes(nodes, nodesOverHighThresholdAndRelocating);
if (lastNodes.equals(nodes) == false) {
if (lastNodes.containsAll(nodes) == false) {
logger.debug("rerouting because disk usage info received from new nodes");
reroute = true;
}
lastNodes = Collections.unmodifiableSet(nodes);
}
final Set<Index> indicesToMarkReadOnly = new HashSet<>();
final Set<Index> indicesNotToAutoRelease = new HashSet<>();
RoutingNodes routingNodes = state.getRoutingNodes();
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease);
final List<DiskUsage> usagesOverHighThreshold = new ArrayList<>();
for (final Map.Entry<String, DiskUsage> entry : usages.entrySet()) {
final String node = entry.getKey();
final DiskUsage usage = entry.getValue();
final RoutingNode routingNode = routingNodes.node(node);
final ByteSizeValue total = ByteSizeValue.ofBytes(usage.totalBytes());
if (isDedicatedFrozenNode(routingNode)) {
if (usage.freeBytes() < diskThresholdSettings.getFreeBytesThresholdFrozenFloodStage(total).getBytes()) {
logger.warn(
"flood stage disk watermark [{}] exceeded on {}",
diskThresholdSettings.describeFrozenFloodStageThreshold(total, false),
usage
);
}
// skip checking high/low watermarks for frozen nodes, since frozen shards have only insignificant local storage footprint
// and this allows us to use more of the local storage for cache.
continue;
}
if (usage.freeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage(total).getBytes()) {
nodesOverLowThreshold.add(node);
nodesOverHighThreshold.add(node);
nodesOverHighThresholdAndRelocating.remove(node);
if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
for (ShardRouting routing : routingNode) {
indicesToMarkReadOnly.add(routing.index());
indicesNotToAutoRelease.add(routing.index());
}
}
logger.warn(
"flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only",
diskThresholdSettings.describeFloodStageThreshold(total, false),
usage
);
continue;
}
if (usage.freeBytes() < diskThresholdSettings.getFreeBytesThresholdHighStage(total).getBytes()) {
if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
for (ShardRouting routing : routingNode) {
indicesNotToAutoRelease.add(routing.index());
}
}
}
final long reservedSpace = info.getReservedSpace(usage.nodeId(), usage.path()).total();
final DiskUsage usageWithReservedSpace = new DiskUsage(
usage.nodeId(),
usage.nodeName(),
usage.path(),
usage.totalBytes(),
Math.max(0L, usage.freeBytes() - reservedSpace)
);
if (usageWithReservedSpace.freeBytes() < diskThresholdSettings.getFreeBytesThresholdHighStage(total).getBytes()) {
nodesOverLowThreshold.add(node);
nodesOverHighThreshold.add(node);
if (lastRunTimeMillis.get() <= currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true;
explanation = "high disk watermark exceeded on one or more nodes";
usagesOverHighThreshold.add(usage);
// will log about this node when the reroute completes
} else {
logger.debug(
"high disk watermark exceeded on {} but an automatic reroute has occurred " + "in the last [{}], skipping reroute",
node,
diskThresholdSettings.getRerouteInterval()
);
}
} else if (usageWithReservedSpace.freeBytes() < diskThresholdSettings.getFreeBytesThresholdLowStage(total).getBytes()) {
nodesOverHighThresholdAndRelocating.remove(node);
final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node);
final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node);
assert (wasUnderLowThreshold && wasOverHighThreshold) == false;
if (wasUnderLowThreshold) {
logger.info(
"low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
diskThresholdSettings.describeLowThreshold(total, false),
usage
);
} else if (wasOverHighThreshold) {
logger.info(
"high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded",
diskThresholdSettings.describeHighThreshold(total, false),
usage,
diskThresholdSettings.describeLowThreshold(total, false)
);
}
} else {
nodesOverHighThresholdAndRelocating.remove(node);
if (nodesOverLowThreshold.contains(node)) {
// The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more
// shards if we reroute now.
if (lastRunTimeMillis.get() <= currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true;
explanation = "one or more nodes has gone under the high or low watermark";
nodesOverLowThreshold.remove(node);
nodesOverHighThreshold.remove(node);
logger.info(
"low disk watermark [{}] no longer exceeded on {}",
diskThresholdSettings.describeLowThreshold(total, false),
usage
);
} else {
logger.debug(
"{} has gone below a disk threshold, but an automatic reroute has occurred "
+ "in the last [{}], skipping reroute",
node,
diskThresholdSettings.getRerouteInterval()
);
}
}
}
}
try (var asyncRefs = new RefCountingRunnable(this::checkFinished)) {
if (reroute) {
logger.debug("rerouting shards: [{}]", explanation);
rerouteService.reroute(
"disk threshold monitor",
Priority.HIGH,
ActionListener.releaseAfter(ActionListener.runAfter(ActionListener.wrap(ignored -> {
final var reroutedClusterState = clusterStateSupplier.get();
for (DiskUsage diskUsage : usagesOverHighThreshold) {
final RoutingNode routingNode = reroutedClusterState.getRoutingNodes().node(diskUsage.nodeId());
final DiskUsage usageIncludingRelocations;
final long relocatingShardsSize;
if (routingNode != null) { // might be temporarily null if ClusterInfoService and ClusterService are out of step
relocatingShardsSize = sizeOfRelocatingShards(routingNode, diskUsage, info, reroutedClusterState);
usageIncludingRelocations = new DiskUsage(
diskUsage.nodeId(),
diskUsage.nodeName(),
diskUsage.path(),
diskUsage.totalBytes(),
diskUsage.freeBytes() - relocatingShardsSize
);
} else {
usageIncludingRelocations = diskUsage;
relocatingShardsSize = 0L;
}
final ByteSizeValue total = ByteSizeValue.ofBytes(usageIncludingRelocations.totalBytes());
if (usageIncludingRelocations.freeBytes() < diskThresholdSettings.getFreeBytesThresholdHighStage(total)
.getBytes()) {
nodesOverHighThresholdAndRelocating.remove(diskUsage.nodeId());
logger.warn("""
high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; currently \
relocating away shards totalling [{}] bytes; the node is expected to continue to exceed the high disk \
watermark when these relocations are complete\
""", diskThresholdSettings.describeHighThreshold(total, false), diskUsage, -relocatingShardsSize);
} else if (nodesOverHighThresholdAndRelocating.add(diskUsage.nodeId())) {
logger.info("""
high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; currently \
relocating away shards totalling [{}] bytes; the node is expected to be below the high disk watermark \
when these relocations are complete\
""", diskThresholdSettings.describeHighThreshold(total, false), diskUsage, -relocatingShardsSize);
} else {
logger.debug("""
high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; currently \
relocating away shards totalling [{}] bytes\
""", diskThresholdSettings.describeHighThreshold(total, false), diskUsage, -relocatingShardsSize);
}
}
}, e -> logger.debug("reroute failed", e)), this::setLastRunTimeMillis), asyncRefs.acquire())
);
} else {
logger.trace("no reroute required");
}
// Generate a map of node name to ID so we can use it to look up node replacement targets
final Map<String, String> nodeNameToId = state.getRoutingNodes()
.stream()
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
// Calculate both the source node id and the target node id of a "replace" type shutdown
final Set<String> nodesIdsPartOfReplacement = state.metadata()
.nodeShutdowns()
.getAll()
.values()
.stream()
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
.filter(Objects::nonNull) // The REPLACE target node might not still be in RoutingNodes
.collect(Collectors.toSet());
// Generate a set of all the indices that exist on either the target or source of a node replacement
final Set<Index> indicesOnReplaceSourceOrTarget = new HashSet<>();
for (String nodeId : nodesIdsPartOfReplacement) {
for (ShardRouting shardRouting : state.getRoutingNodes().node(nodeId)) {
indicesOnReplaceSourceOrTarget.add(shardRouting.index());
}
}
Set<Index> indicesToAutoRelease = new HashSet<>();
for (IndexRoutingTable indexRouting : state.globalRoutingTable().indexRouting()) {
Index index = indexRouting.getIndex();
if (indicesNotToAutoRelease.contains(index)) {
continue;
}
// Do not auto release indices that are on either the source or the target of a node replacement
if (indicesOnReplaceSourceOrTarget.contains(index)) {
continue;
}
var projectId = state.metadata().projectFor(index).id();
if (state.getBlocks().hasIndexBlock(projectId, index.getName(), IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) {
indicesToAutoRelease.add(index);
}
}
if (indicesToAutoRelease.isEmpty() == false) {
logger.info(
"releasing read-only block on indices "
+ indicesToAutoRelease
+ " since they are now allocated to nodes with sufficient disk space"
);
updateIndicesReadOnly(state, indicesToAutoRelease, asyncRefs.acquire(), false);
} else {
logger.trace("no auto-release required");
}
indicesToMarkReadOnly.removeIf(index -> {
final String indexName = index.getName();
return state.getBlocks().indexBlocked(state.metadata().projectFor(index).id(), ClusterBlockLevel.WRITE, indexName);
});
logger.trace("marking indices as read-only: [{}]", indicesToMarkReadOnly);
if (indicesToMarkReadOnly.isEmpty() == false) {
updateIndicesReadOnly(state, indicesToMarkReadOnly, asyncRefs.acquire(), true);
}
}
}