in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java [367:421]
protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
RestClusterClient<?> restClient,
JobID jobID,
JobVertexID jobVertexID,
JobTopology topology) {
var allMetricNames = queryAggregatedMetricNames(restClient, jobID, jobVertexID);
var filteredMetrics = new HashMap<String, FlinkMetric>();
var requiredMetrics = new HashSet<FlinkMetric>();
requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);
if (topology.isSource(jobVertexID)) {
requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
// Pending records metric won't be available for some sources.
// The Kafka source, for instance, lazily initializes this metric on receiving
// the first record. If this is a fresh topic or no new data has been read since
// the last checkpoint, the pendingRecords metrics won't be available. Also, legacy
// sources do not have this metric.
List<String> pendingRecordsMetric = FlinkMetric.PENDING_RECORDS.findAll(allMetricNames);
if (pendingRecordsMetric.isEmpty()) {
LOG.warn(
"pendingRecords metric for {} could not be found. Either a legacy source or an idle source. Assuming no pending records.",
jobVertexID);
}
pendingRecordsMetric.forEach(m -> filteredMetrics.put(m, FlinkMetric.PENDING_RECORDS));
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
.findAny(allMetricNames)
.ifPresent(
m ->
filteredMetrics.put(
m, FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
} else {
// Not a source so we must have numRecordsInPerSecond
requiredMetrics.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
}
if (!topology.getOutputs().get(jobVertexID).isEmpty()) {
// Not a sink so we must have numRecordsOutPerSecond
requiredMetrics.add(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
}
for (FlinkMetric flinkMetric : requiredMetrics) {
Optional<String> flinkMetricName = flinkMetric.findAny(allMetricNames);
if (flinkMetricName.isPresent()) {
// Add actual Flink metric name to list
filteredMetrics.put(flinkMetricName.get(), flinkMetric);
} else {
throw new RuntimeException(
"Could not find required metric " + flinkMetric + " for " + jobVertexID);
}
}
return filteredMetrics;
}