in server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStats.java [35:306]
public record ClusterBalanceStats(
int shards,
int undesiredShardAllocations,
Map<String, TierBalanceStats> tiers,
Map<String, NodeBalanceStats> nodes
) implements Writeable, ToXContentObject {
public static final ClusterBalanceStats EMPTY = new ClusterBalanceStats(0, 0, Map.of(), Map.of());
public static ClusterBalanceStats createFrom(
ClusterState clusterState,
DesiredBalance desiredBalance,
ClusterInfo clusterInfo,
WriteLoadForecaster writeLoadForecaster
) {
var tierToNodeStats = new HashMap<String, List<NodeBalanceStats>>();
var nodes = new HashMap<String, NodeBalanceStats>();
for (RoutingNode routingNode : clusterState.getRoutingNodes()) {
var dataRoles = routingNode.node().getRoles().stream().filter(DiscoveryNodeRole::canContainData).toList();
if (dataRoles.isEmpty()) {
continue;
}
var nodeStats = NodeBalanceStats.createFrom(
routingNode,
clusterState.metadata(),
desiredBalance,
clusterInfo,
writeLoadForecaster
);
nodes.put(routingNode.node().getName(), nodeStats);
for (DiscoveryNodeRole role : dataRoles) {
tierToNodeStats.computeIfAbsent(role.roleName(), ignored -> new ArrayList<>()).add(nodeStats);
}
}
return new ClusterBalanceStats(
nodes.values().stream().mapToInt(NodeBalanceStats::shards).sum(),
nodes.values().stream().mapToInt(NodeBalanceStats::undesiredShardAllocations).sum(),
Maps.transformValues(tierToNodeStats, TierBalanceStats::createFrom),
nodes
);
}
public static ClusterBalanceStats readFrom(StreamInput in) throws IOException {
return new ClusterBalanceStats(
in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0) ? in.readVInt() : -1,
in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0) ? in.readVInt() : -1,
in.readImmutableMap(TierBalanceStats::readFrom),
in.readImmutableMap(NodeBalanceStats::readFrom)
);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeVInt(shards);
out.writeVInt(undesiredShardAllocations);
}
out.writeMap(tiers, StreamOutput::writeWriteable);
out.writeMap(nodes, StreamOutput::writeWriteable);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field("shard_count", shards)
.field("undesired_shard_allocation_count", undesiredShardAllocations)
.field("tiers", tiers)
.field("nodes", nodes)
.endObject();
}
public record TierBalanceStats(
MetricStats shardCount,
MetricStats undesiredShardAllocations,
MetricStats forecastWriteLoad,
MetricStats forecastShardSize,
MetricStats actualShardSize
) implements Writeable, ToXContentObject {
private static TierBalanceStats createFrom(List<NodeBalanceStats> nodes) {
return new TierBalanceStats(
MetricStats.createFrom(nodes, it -> it.shards),
MetricStats.createFrom(nodes, it -> it.undesiredShardAllocations),
MetricStats.createFrom(nodes, it -> it.forecastWriteLoad),
MetricStats.createFrom(nodes, it -> it.forecastShardSize),
MetricStats.createFrom(nodes, it -> it.actualShardSize)
);
}
public static TierBalanceStats readFrom(StreamInput in) throws IOException {
return new TierBalanceStats(
MetricStats.readFrom(in),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)
? MetricStats.readFrom(in)
: new MetricStats(0.0, 0.0, 0.0, 0.0, 0.0),
MetricStats.readFrom(in),
MetricStats.readFrom(in),
MetricStats.readFrom(in)
);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardCount.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
undesiredShardAllocations.writeTo(out);
}
forecastWriteLoad.writeTo(out);
forecastShardSize.writeTo(out);
actualShardSize.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field("shard_count", shardCount)
.field("undesired_shard_allocation_count", undesiredShardAllocations)
.field("forecast_write_load", forecastWriteLoad)
.field("forecast_disk_usage", forecastShardSize)
.field("actual_disk_usage", actualShardSize)
.endObject();
}
}
public record MetricStats(double total, double min, double max, double average, double stdDev) implements Writeable, ToXContentObject {
private static MetricStats createFrom(List<NodeBalanceStats> nodes, ToDoubleFunction<NodeBalanceStats> metricExtractor) {
assert nodes.isEmpty() == false : "Stats must be created from non empty nodes";
double total = 0.0;
double total2 = 0.0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
int count = 0;
for (NodeBalanceStats node : nodes) {
var metric = metricExtractor.applyAsDouble(node);
if (Double.isNaN(metric)) {
continue;
}
total += metric;
total2 += Math.pow(metric, 2);
min = Math.min(min, metric);
max = Math.max(max, metric);
count++;
}
double average = count == 0 ? Double.NaN : total / count;
double stdDev = count == 0 ? Double.NaN : Math.sqrt(total2 / count - Math.pow(average, 2));
return new MetricStats(total, min, max, average, stdDev);
}
public static MetricStats readFrom(StreamInput in) throws IOException {
return new MetricStats(in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(total);
out.writeDouble(min);
out.writeDouble(max);
out.writeDouble(average);
out.writeDouble(stdDev);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field("total", total)
.field("min", min)
.field("max", max)
.field("average", average)
.field("std_dev", stdDev)
.endObject();
}
}
public record NodeBalanceStats(
String nodeId,
List<String> roles,
int shards,
int undesiredShardAllocations,
double forecastWriteLoad,
long forecastShardSize,
long actualShardSize
) implements Writeable, ToXContentObject {
private static final String UNKNOWN_NODE_ID = "UNKNOWN";
private static NodeBalanceStats createFrom(
RoutingNode routingNode,
Metadata metadata,
DesiredBalance desiredBalance,
ClusterInfo clusterInfo,
WriteLoadForecaster writeLoadForecaster
) {
int undesired = 0;
double forecastWriteLoad = 0.0;
long forecastShardSize = 0L;
long actualShardSize = 0L;
for (ShardRouting shardRouting : routingNode) {
var indexMetadata = metadata.indexMetadata(shardRouting.index());
var shardSize = clusterInfo.getShardSize(shardRouting, 0L);
forecastWriteLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
forecastShardSize += indexMetadata.getForecastedShardSizeInBytes().orElse(shardSize);
actualShardSize += shardSize;
if (isDesiredShardAllocation(shardRouting, desiredBalance) == false) {
undesired++;
}
}
return new NodeBalanceStats(
routingNode.nodeId(),
routingNode.node().getRoles().stream().map(DiscoveryNodeRole::roleName).toList(),
routingNode.size(),
undesired,
forecastWriteLoad,
forecastShardSize,
actualShardSize
);
}
private static boolean isDesiredShardAllocation(ShardRouting shardRouting, DesiredBalance desiredBalance) {
if (shardRouting.relocating()) {
// relocating out shards are temporarily accepted
return true;
}
var assignment = desiredBalance.getAssignment(shardRouting.shardId());
return assignment != null && assignment.nodeIds().contains(shardRouting.currentNodeId());
}
public static NodeBalanceStats readFrom(StreamInput in) throws IOException {
return new NodeBalanceStats(
in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0) ? in.readString() : UNKNOWN_NODE_ID,
in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0) ? in.readStringCollectionAsList() : List.of(),
in.readInt(),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0) ? in.readVInt() : -1,
in.readDouble(),
in.readLong(),
in.readLong()
);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
out.writeString(nodeId);
out.writeStringCollection(roles);
}
out.writeInt(shards);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeVInt(undesiredShardAllocations);
}
out.writeDouble(forecastWriteLoad);
out.writeLong(forecastShardSize);
out.writeLong(actualShardSize);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (UNKNOWN_NODE_ID.equals(nodeId) == false) {
builder.field("node_id", nodeId);
}
return builder.field("roles", roles)
.field("shard_count", shards)
.field("undesired_shard_allocation_count", undesiredShardAllocations)
.field("forecast_write_load", forecastWriteLoad)
.humanReadableField("forecast_disk_usage_bytes", "forecast_disk_usage", ByteSizeValue.ofBytes(forecastShardSize))
.humanReadableField("actual_disk_usage_bytes", "actual_disk_usage", ByteSizeValue.ofBytes(actualShardSize))
.endObject();
}
}
}