in src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java [116:306]
public static void emitAggregatedOSMetrics(
final DSLContext create,
final MetricsDB db,
final OSMetricsSnapshot osMetricsSnap,
final ShardRequestMetricsSnapshot rqMetricsSnap)
throws Exception {
SelectHavingStep<Record> rqTable = rqMetricsSnap.fetchThreadUtilizationRatioTable();
SelectHavingStep<Record> osTable = osMetricsSnap.selectAll();
List<SelectField<?>> fields =
new ArrayList<SelectField<?>>() {
{
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.SHARD_ID
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.INDEX_NAME
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.OPERATION
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.SHARD_ROLE
.toString()),
String.class));
}
};
for (AllMetrics.OSMetrics metric : AllMetrics.OSMetrics.values()) {
fields.add(
DSL.field(ShardRequestMetricsSnapshot.Fields.TUTIL.toString(), Double.class)
.mul(DSL.field(DSL.name(metric.toString()), Double.class))
.as(metric.toString()));
}
ArrayList<Field<?>> groupByFields =
new ArrayList<Field<?>>() {
{
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.SHARD_ID
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.INDEX_NAME
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.OPERATION
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.SHARD_ROLE
.toString()),
String.class));
}
};
List<SelectField<?>> aggFields =
new ArrayList<SelectField<?>>() {
{
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.SHARD_ID
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.INDEX_NAME
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.OPERATION
.toString()),
String.class));
this.add(
DSL.field(
DSL.name(
ShardRequestMetricsSnapshot.Fields.SHARD_ROLE
.toString()),
String.class));
}
};
for (AllMetrics.OSMetrics metric : AllMetrics.OSMetrics.values()) {
aggFields.add(
DSL.sum(DSL.field(DSL.name(metric.toString()), Double.class))
.as(MetricsDB.SUM + "_" + metric.toString()));
aggFields.add(
DSL.avg(DSL.field(DSL.name(metric.toString()), Double.class))
.as(MetricsDB.AVG + "_" + metric.toString()));
aggFields.add(
DSL.min(DSL.field(DSL.name(metric.toString()), Double.class))
.as(MetricsDB.MIN + "_" + metric.toString()));
aggFields.add(
DSL.max(DSL.field(DSL.name(metric.toString()), Double.class))
.as(MetricsDB.MAX + "_" + metric.toString()));
}
long mCurrT = System.currentTimeMillis();
Result<Record> res =
create.select(aggFields)
.from(
create.select(fields)
.from(rqTable)
.join(osTable)
.on(
osTable.field(
OSMetricsSnapshot.Fields.tid
.toString(),
String.class)
.eq(
rqTable.field(
OSMetricsSnapshot.Fields.tid
.toString(),
String.class))))
.groupBy(groupByFields)
.fetch();
long mFinalT = System.currentTimeMillis();
LOG.debug("Total time taken for tid corelation: {}", mFinalT - mCurrT);
checkInvalidData(rqTable, osTable, create);
Set<String> metricColumns = osMetricsSnap.getMetricColumns();
mCurrT = System.currentTimeMillis();
for (String metricColumn : metricColumns) {
List<String> dims =
new ArrayList<String>() {
{
this.add(AllMetrics.CommonDimension.SHARD_ID.toString());
this.add(AllMetrics.CommonDimension.INDEX_NAME.toString());
this.add(AllMetrics.CommonDimension.OPERATION.toString());
this.add(AllMetrics.CommonDimension.SHARD_ROLE.toString());
}
};
db.createMetric(new Metric<Double>(metricColumn, 0d), dims);
BatchBindStep handle = db.startBatchPut(new Metric<Double>(metricColumn, 0d), dims);
for (Record r : res) {
if (r.get(MetricsDB.SUM + "_" + metricColumn) == null) {
continue;
}
Double sumMetric =
Double.parseDouble(r.get(MetricsDB.SUM + "_" + metricColumn).toString());
Double avgMetric =
Double.parseDouble(r.get(MetricsDB.AVG + "_" + metricColumn).toString());
Double minMetric =
Double.parseDouble(r.get(MetricsDB.MIN + "_" + metricColumn).toString());
Double maxMetric =
Double.parseDouble(r.get(MetricsDB.MAX + "_" + metricColumn).toString());
handle.bind(
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()).toString(),
sumMetric,
avgMetric,
minMetric,
maxMetric);
}
if (handle.size() > 0) {
handle.execute();
}
}
mFinalT = System.currentTimeMillis();
LOG.debug("Total time taken for writing resource metrics metricsdb: {}", mFinalT - mCurrT);
}