private static double getNumRecordsInInternal()

in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java [217:262]


    private static double getNumRecordsInInternal(
            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
            IOMetrics ioMetrics,
            JobVertexID jobVertexID,
            boolean isSource,
            boolean perSecond) {
        // Generate numRecordsInPerSecond from 3 metrics:
        // 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task metric.
        var numRecords =
                perSecond
                        ? null // Per second only available for sources
                        : new AggregatedMetric(
                                "n",
                                Double.NaN,
                                Double.NaN,
                                Double.NaN,
                                (double) ioMetrics.getNumRecordsIn(),
                                Double.NaN);

        // 2. If the former is unavailable and the vertex contains a source operator, use the
        // corresponding source operator metric.
        if (isSource && (numRecords == null || numRecords.getSum() == 0)) {
            var sourceTaskIn =
                    flinkMetrics.get(
                            perSecond
                                    ? FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC
                                    : FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN);
            numRecords = sourceTaskIn != null ? sourceTaskIn : numRecords;
        }
        // 3. If the vertex contains a source operator which does not emit input metrics, use output
        // metrics instead.
        if (isSource && (numRecords == null || numRecords.getSum() == 0)) {
            var sourceTaskOut =
                    flinkMetrics.get(
                            perSecond
                                    ? FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
                                    : FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT);
            numRecords = sourceTaskOut != null ? sourceTaskOut : numRecords;
        }

        if (numRecords == null) {
            LOG.debug("Received null input rate for {}. Returning NaN.", jobVertexID);
            return Double.NaN;
        }
        return Math.max(0, numRecords.getSum());
    }