in src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java [346:475]
public static void emitWorkloadMetrics(
final DSLContext create,
final MetricsDB db,
final ShardRequestMetricsSnapshot rqMetricsSnap)
throws Exception {
long mCurrT = System.currentTimeMillis();
Result<Record> res = rqMetricsSnap.fetchLatencyByOp();
db.createMetric(
new Metric<Double>(AllMetrics.CommonMetric.LATENCY.toString(), 0d),
LATENCY_TABLE_DIMENSIONS);
BatchBindStep handle =
db.startBatchPut(
new Metric<Double>(AllMetrics.CommonMetric.LATENCY.toString(), 0d),
LATENCY_TABLE_DIMENSIONS);
// Dims need to be changed.
List<String> shardDims =
new ArrayList<String>() {
{
this.add(ShardRequestMetricsSnapshot.Fields.OPERATION.toString());
this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString());
this.add(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString());
this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString());
}
};
db.createMetric(
new Metric<Double>(AllMetrics.ShardOperationMetric.SHARD_OP_COUNT.toString(), 0d),
shardDims);
BatchBindStep countHandle =
db.startBatchPut(
new Metric<Double>(
AllMetrics.ShardOperationMetric.SHARD_OP_COUNT.toString(), 0d),
shardDims);
db.createMetric(
new Metric<Double>(AllMetrics.ShardBulkMetric.DOC_COUNT.toString(), 0d), shardDims);
BatchBindStep bulkDocHandle =
db.startBatchPut(
new Metric<Double>(AllMetrics.ShardBulkMetric.DOC_COUNT.toString(), 0d),
shardDims);
for (Record r : res) {
Double sumLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
ShardRequestMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.SUM))
.toString());
Double avgLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
ShardRequestMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.AVG))
.toString());
Double minLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
ShardRequestMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.MIN))
.toString());
Double maxLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
ShardRequestMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.MAX))
.toString());
handle.bind(
r.get(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()).toString(),
null,
null,
null,
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()).toString(),
sumLatency,
avgLatency,
minLatency,
maxLatency);
Double count =
Double.parseDouble(
r.get(AllMetrics.ShardOperationMetric.SHARD_OP_COUNT.toString())
.toString());
countHandle.bind(
r.get(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()).toString(),
count,
count,
count,
count);
Object bulkDocCountObj = r.get(AllMetrics.ShardBulkMetric.DOC_COUNT.toString());
if (bulkDocCountObj != null) {
Double bulkDocCount = Double.parseDouble(bulkDocCountObj.toString());
bulkDocHandle.bind(
r.get(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()).toString(),
bulkDocCount,
bulkDocCount,
bulkDocCount,
bulkDocCount);
}
}
if (handle.size() > 0) {
handle.execute();
}
if (countHandle.size() > 0) {
countHandle.execute();
}
if (bulkDocHandle.size() > 0) {
bulkDocHandle.execute();
}
long mFinalT = System.currentTimeMillis();
LOG.debug("Total time taken for writing workload metrics metricsdb: {}", mFinalT - mCurrT);
}