public ResourceFlowUnit operate()

in src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/hot_node/GenericResourceRca.java [125:190]


    public ResourceFlowUnit<HotResourceSummary> operate() {
        counter += 1;

        for (MetricFlowUnit flowunit : resourceUsageGroupByConsumer.getFlowUnits()) {
            if (flowunit.isEmpty()) {
                continue;
            }
            final Result<Record> result = flowunit.getData();
            if (result == null) {
                continue;
            }
            boolean recordParsingError = false;
            double totalUsage = 0.0;
            for (Record record : result) {
                int fieldSize = record.size();
                if (fieldSize < 2) {
                    LOG.error(
                            "Field size {} is less than 2, the SQL record has wrong data format",
                            fieldSize);
                    recordParsingError = true;
                    break;
                }
                try {
                    double num = record.getValue(fieldSize - 1, Double.class);
                    totalUsage += num;
                } catch (DataTypeException de) {
                    LOG.error(
                            "Fail to data field from SQL record, field index : {}, trace : {}",
                            fieldSize - 1,
                            de.getStackTrace());
                    recordParsingError = true;
                    break;
                }
            }
            if (!recordParsingError) {
                slidingWindow.next(new SlidingWindowData(this.clock.millis(), totalUsage));
            }
        }

        if (counter == rcaPeriod) {
            ResourceContext context = null;
            HotResourceSummary summary = null;
            // reset the variables
            counter = 0;

            double avgCpuUsage = slidingWindow.readAvg();
            if (!Double.isNaN(avgCpuUsage) && avgCpuUsage > threshold) {
                context = new ResourceContext(Resources.State.CONTENDED);
            } else {
                context = new ResourceContext(Resources.State.HEALTHY);
            }

            // check to see if the value is above lower bound thres
            if (!Double.isNaN(avgCpuUsage) && avgCpuUsage >= lowerBoundThreshold) {
                summary =
                        new HotResourceSummary(
                                this.resource, threshold, avgCpuUsage, SLIDING_WINDOW_IN_MIN * 60);
                addTopConsumerSummary(summary);
            }
            return new ResourceFlowUnit<>(clock.millis(), context, summary);
        } else {
            // we return an empty FlowUnit RCA for now. Can change to healthy (or previous known RCA
            // state)
            return new ResourceFlowUnit<>(clock.millis());
        }
    }