in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java [145:163]
private static double getNumRecordsInPerSecond(
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
JobVertexID jobVertexID,
boolean isSource) {
var numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
if (isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) {
numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
}
if (isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) {
numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
}
if (numRecordsInPerSecond == null) {
LOG.warn("Received null input rate for {}. Returning NaN.", jobVertexID);
return Double.NaN;
}
return numRecordsInPerSecond.getSum();
}