protected Map getFilteredVertexMetricNames()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java [367:421]


    protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
            RestClusterClient<?> restClient,
            JobID jobID,
            JobVertexID jobVertexID,
            JobTopology topology) {

        var allMetricNames = queryAggregatedMetricNames(restClient, jobID, jobVertexID);
        var filteredMetrics = new HashMap<String, FlinkMetric>();
        var requiredMetrics = new HashSet<FlinkMetric>();

        requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);

        if (topology.isSource(jobVertexID)) {
            requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
            // Pending records metric won't be available for some sources.
            // The Kafka source, for instance, lazily initializes this metric on receiving
            // the first record. If this is a fresh topic or no new data has been read since
            // the last checkpoint, the pendingRecords metrics won't be available. Also, legacy
            // sources do not have this metric.
            List<String> pendingRecordsMetric = FlinkMetric.PENDING_RECORDS.findAll(allMetricNames);
            if (pendingRecordsMetric.isEmpty()) {
                LOG.warn(
                        "pendingRecords metric for {} could not be found. Either a legacy source or an idle source. Assuming no pending records.",
                        jobVertexID);
            }
            pendingRecordsMetric.forEach(m -> filteredMetrics.put(m, FlinkMetric.PENDING_RECORDS));
            FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
                    .findAny(allMetricNames)
                    .ifPresent(
                            m ->
                                    filteredMetrics.put(
                                            m, FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
        } else {
            // Not a source so we must have numRecordsInPerSecond
            requiredMetrics.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
        }

        if (!topology.getOutputs().get(jobVertexID).isEmpty()) {
            // Not a sink so we must have numRecordsOutPerSecond
            requiredMetrics.add(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
        }

        for (FlinkMetric flinkMetric : requiredMetrics) {
            Optional<String> flinkMetricName = flinkMetric.findAny(allMetricNames);
            if (flinkMetricName.isPresent()) {
                // Add actual Flink metric name to list
                filteredMetrics.put(flinkMetricName.get(), flinkMetric);
            } else {
                throw new RuntimeException(
                        "Could not find required metric " + flinkMetric + " for " + jobVertexID);
            }
        }

        return filteredMetrics;
    }