public static void emitAggregatedOSMetrics()

in src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java [116:306]


    public static void emitAggregatedOSMetrics(
            final DSLContext create,
            final MetricsDB db,
            final OSMetricsSnapshot osMetricsSnap,
            final ShardRequestMetricsSnapshot rqMetricsSnap)
            throws Exception {

        SelectHavingStep<Record> rqTable = rqMetricsSnap.fetchThreadUtilizationRatioTable();
        SelectHavingStep<Record> osTable = osMetricsSnap.selectAll();

        List<SelectField<?>> fields =
                new ArrayList<SelectField<?>>() {
                    {
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.SHARD_ID
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.INDEX_NAME
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.OPERATION
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.SHARD_ROLE
                                                        .toString()),
                                        String.class));
                    }
                };

        for (AllMetrics.OSMetrics metric : AllMetrics.OSMetrics.values()) {
            fields.add(
                    DSL.field(ShardRequestMetricsSnapshot.Fields.TUTIL.toString(), Double.class)
                            .mul(DSL.field(DSL.name(metric.toString()), Double.class))
                            .as(metric.toString()));
        }

        ArrayList<Field<?>> groupByFields =
                new ArrayList<Field<?>>() {
                    {
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.SHARD_ID
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.INDEX_NAME
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.OPERATION
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.SHARD_ROLE
                                                        .toString()),
                                        String.class));
                    }
                };

        List<SelectField<?>> aggFields =
                new ArrayList<SelectField<?>>() {
                    {
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.SHARD_ID
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.INDEX_NAME
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.OPERATION
                                                        .toString()),
                                        String.class));
                        this.add(
                                DSL.field(
                                        DSL.name(
                                                ShardRequestMetricsSnapshot.Fields.SHARD_ROLE
                                                        .toString()),
                                        String.class));
                    }
                };

        for (AllMetrics.OSMetrics metric : AllMetrics.OSMetrics.values()) {
            aggFields.add(
                    DSL.sum(DSL.field(DSL.name(metric.toString()), Double.class))
                            .as(MetricsDB.SUM + "_" + metric.toString()));
            aggFields.add(
                    DSL.avg(DSL.field(DSL.name(metric.toString()), Double.class))
                            .as(MetricsDB.AVG + "_" + metric.toString()));
            aggFields.add(
                    DSL.min(DSL.field(DSL.name(metric.toString()), Double.class))
                            .as(MetricsDB.MIN + "_" + metric.toString()));
            aggFields.add(
                    DSL.max(DSL.field(DSL.name(metric.toString()), Double.class))
                            .as(MetricsDB.MAX + "_" + metric.toString()));
        }

        long mCurrT = System.currentTimeMillis();
        Result<Record> res =
                create.select(aggFields)
                        .from(
                                create.select(fields)
                                        .from(rqTable)
                                        .join(osTable)
                                        .on(
                                                osTable.field(
                                                                OSMetricsSnapshot.Fields.tid
                                                                        .toString(),
                                                                String.class)
                                                        .eq(
                                                                rqTable.field(
                                                                        OSMetricsSnapshot.Fields.tid
                                                                                .toString(),
                                                                        String.class))))
                        .groupBy(groupByFields)
                        .fetch();
        long mFinalT = System.currentTimeMillis();
        LOG.debug("Total time taken for tid corelation: {}", mFinalT - mCurrT);
        checkInvalidData(rqTable, osTable, create);

        Set<String> metricColumns = osMetricsSnap.getMetricColumns();

        mCurrT = System.currentTimeMillis();
        for (String metricColumn : metricColumns) {
            List<String> dims =
                    new ArrayList<String>() {
                        {
                            this.add(AllMetrics.CommonDimension.SHARD_ID.toString());
                            this.add(AllMetrics.CommonDimension.INDEX_NAME.toString());
                            this.add(AllMetrics.CommonDimension.OPERATION.toString());
                            this.add(AllMetrics.CommonDimension.SHARD_ROLE.toString());
                        }
                    };
            db.createMetric(new Metric<Double>(metricColumn, 0d), dims);
            BatchBindStep handle = db.startBatchPut(new Metric<Double>(metricColumn, 0d), dims);
            for (Record r : res) {
                if (r.get(MetricsDB.SUM + "_" + metricColumn) == null) {
                    continue;
                }

                Double sumMetric =
                        Double.parseDouble(r.get(MetricsDB.SUM + "_" + metricColumn).toString());
                Double avgMetric =
                        Double.parseDouble(r.get(MetricsDB.AVG + "_" + metricColumn).toString());
                Double minMetric =
                        Double.parseDouble(r.get(MetricsDB.MIN + "_" + metricColumn).toString());
                Double maxMetric =
                        Double.parseDouble(r.get(MetricsDB.MAX + "_" + metricColumn).toString());
                handle.bind(
                        r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()).toString(),
                        r.get(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()).toString(),
                        r.get(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()).toString(),
                        r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()).toString(),
                        sumMetric,
                        avgMetric,
                        minMetric,
                        maxMetric);
            }

            if (handle.size() > 0) {
                handle.execute();
            }
        }
        mFinalT = System.currentTimeMillis();
        LOG.debug("Total time taken for writing resource metrics metricsdb: {}", mFinalT - mCurrT);
    }