in src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/temperature/DimensionalTemperatureCalculator.java [81:243]
public static DimensionalTemperatureFlowUnit getTemperatureForDimension(
ShardStore shardStore,
TemperatureDimension dimension,
ShardBasedTemperatureCalculator resourceByShardId,
AvgShardBasedTemperatureCalculator avgResUsageByAllShards,
ShardIndependentTemperatureCalculator resourceShardIndependent,
TotalNodeTemperatureCalculator resourcePeakUsage,
TemperatureVector.NormalizedValue threshold) {
List<List<MetricFlowUnit>> allFlowUnits = new ArrayList<>();
List<MetricFlowUnit> shardIdBasedFlowUnits = resourceByShardId.getFlowUnits();
List<MetricFlowUnit> avgResUsageFlowUnits = avgResUsageByAllShards.getFlowUnits();
List<MetricFlowUnit> resourcePeakFlowUnits = resourcePeakUsage.getFlowUnits();
allFlowUnits.add(shardIdBasedFlowUnits);
allFlowUnits.add(avgResUsageFlowUnits);
allFlowUnits.add(resourcePeakFlowUnits);
AtomicBoolean emptyFlowUnit = new AtomicBoolean(false);
allFlowUnits.forEach(
flowUnitAcrossOneCalculator -> {
if (flowUnitAcrossOneCalculator.get(0).isEmpty()) {
LOG.debug("Empty flowUnitAcrossOneCalculator");
emptyFlowUnit.set(true);
}
});
if (emptyFlowUnit.get()) {
return new DimensionalTemperatureFlowUnit(System.currentTimeMillis());
}
AtomicBoolean noEntriesInFlowUnit = new AtomicBoolean(false);
allFlowUnits.forEach(
flowUnitAcrossOneCalculator -> {
if (flowUnitAcrossOneCalculator.get(0).getData().size() == 0) {
LOG.debug("No Entries in flowUnits");
noEntriesInFlowUnit.set(true);
}
});
if (noEntriesInFlowUnit.get()) {
return new DimensionalTemperatureFlowUnit(System.currentTimeMillis());
}
LOG.debug("shardIdBasedFlowUnits " + shardIdBasedFlowUnits.get(0).getData());
LOG.debug("avgResUsageFlowUnits " + avgResUsageFlowUnits.get(0).getData());
LOG.debug("resourcePeakFlowUnits " + resourcePeakFlowUnits.get(0).getData());
if (shardIdBasedFlowUnits.get(0).getData().get(0).size() != 3) {
// example:
// [0: [[IndexName, ShardID, sum], [geonames, 0, 0.35558242693567], [geonames, 2,
// 0.0320651297686606]]]
// we expect it to have three columns but the number of rows is determined by the
// number of indices and shards in the node.
throw new IllegalArgumentException("Size more than expected: " + shardIdBasedFlowUnits);
}
if (avgResUsageFlowUnits.get(0).getData().get(0).size() != 1) {
// example:
// [0: [shard_avg], [0.0123598766010401]]
// We expect it to have single column
throw new IllegalArgumentException(
"Size more than expected:\n"
+ avgResUsageFlowUnits.get(0).getData()
+ "\n found: "
+ avgResUsageFlowUnits.get(0).getData().intoArrays().length);
}
if (resourcePeakFlowUnits.get(0).getData().get(0).size() != 1) {
// example:
// [0: [SUM_of_max], [0.03236403909818]]
// We expect it to have single column
throw new IllegalArgumentException(
"Size more than expected: \n" + resourcePeakFlowUnits.get(0).getData());
}
// avgUsageAcrossShards contains the average of resource consumed over all the shards.
// e.g. If Total CPU consumed by shards is 50% and total number of shards on the node
// are 5, This would have the value as 10.
double avgUsageAcrossShards = -1.0;
try {
List<Double> values =
avgResUsageFlowUnits
.get(0)
.getData()
.getValues(AvgShardBasedTemperatureCalculator.SHARD_AVG, Double.class);
if (values != null && values.get(0) != null) {
avgUsageAcrossShards = values.get(0);
} else {
// This means that there are no shards on this node. So we will return an empty
// FlowUnit.
return new DimensionalTemperatureFlowUnit(System.currentTimeMillis());
}
} catch (Exception ex) {
LOG.error("DBError getting shard average:", ex);
return new DimensionalTemperatureFlowUnit(System.currentTimeMillis());
}
// totalUsageInNode contains the value consumed by the resource on the node.
double totalUsageInNode = -1.0;
try {
List<Double> values =
resourcePeakFlowUnits
.get(0)
.getData()
.getValues(TemperatureMetricsBase.AGGR_OVER_AGGR_NAME, Double.class);
if (values != null && values.get(0) != null) {
totalUsageInNode = values.get(0);
} else {
// This means that there are no shards on this node. So we will return an empty
// FlowUnit.
return new DimensionalTemperatureFlowUnit(System.currentTimeMillis());
}
} catch (Exception ex) {
LOG.error("DBError getting shard average:", ex);
return new DimensionalTemperatureFlowUnit(System.currentTimeMillis());
}
// normalizedConsumptionAcrossShards contains average normalized value (on a scale of 10)
// the of the resource
// e.g. If the total resource(CPU) used is 50% and there are 6 shards on the node.
// normalizedConsumptionAcrossShards would be equal to ((50/6)*10)/50 = 1.67
TemperatureVector.NormalizedValue normalizedConsumptionAcrossShards =
TemperatureVector.NormalizedValue.calculate(avgUsageAcrossShards, totalUsageInNode);
// valuesOverShards contains the usage of resource over every shard.
Result<Record> valuesOverShards = shardIdBasedFlowUnits.get(0).getData();
NodeLevelDimensionalSummary nodeDimensionProfile =
new NodeLevelDimensionalSummary(
dimension, normalizedConsumptionAcrossShards, totalUsageInNode);
// The shardIdBasedFlowUnits is supposed to contain one row per shard.
nodeDimensionProfile.setNumberOfShards(valuesOverShards.size());
for (Record record : valuesOverShards) {
// Each row has columns like:
// IndexName, ShardID, sum
String indexName = record.getValue(ColumnTypes.IndexName.name(), String.class);
int shardId = record.getValue(ColumnTypes.ShardID.name(), Integer.class);
double usageByShard = record.getValue(ColumnTypes.sum.name(), Double.class);
// normalizedConsumptionByShard contains the normalized value of the resource consumed
// by this shard. e.g. If the If the total resource(CPU) used is 50% and
// CPU consumed by this shard is 6%, normalizedConsumptionByShard = (6*10/50) = 1.2
TemperatureVector.NormalizedValue normalizedConsumptionByShard =
TemperatureVector.NormalizedValue.calculate(usageByShard, totalUsageInNode);
// HeatZone for shard contains the zone assigned to this particular shard with respect
// to the dimension
// specified. A Shard can be present in different zones for different dimensions.
HeatZoneAssigner.Zone heatZoneForShard =
HeatZoneAssigner.assign(
normalizedConsumptionByShard,
normalizedConsumptionAcrossShards,
threshold);
IndexShardKey indexShardKey = new IndexShardKey(indexName, shardId);
ShardProfileSummary shardProfileSummary = shardStore.getOrCreateIfAbsent(indexShardKey);
shardProfileSummary.addTemperatureForDimension(dimension, normalizedConsumptionByShard);
nodeDimensionProfile.addShardToZone(shardProfileSummary, heatZoneForShard);
}
return new DimensionalTemperatureFlowUnit(System.currentTimeMillis(), nodeDimensionProfile);
}