in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java [217:262]
private static double getNumRecordsInInternal(
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
IOMetrics ioMetrics,
JobVertexID jobVertexID,
boolean isSource,
boolean perSecond) {
// Generate numRecordsInPerSecond from 3 metrics:
// 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task metric.
var numRecords =
perSecond
? null // Per second only available for sources
: new AggregatedMetric(
"n",
Double.NaN,
Double.NaN,
Double.NaN,
(double) ioMetrics.getNumRecordsIn(),
Double.NaN);
// 2. If the former is unavailable and the vertex contains a source operator, use the
// corresponding source operator metric.
if (isSource && (numRecords == null || numRecords.getSum() == 0)) {
var sourceTaskIn =
flinkMetrics.get(
perSecond
? FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC
: FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN);
numRecords = sourceTaskIn != null ? sourceTaskIn : numRecords;
}
// 3. If the vertex contains a source operator which does not emit input metrics, use output
// metrics instead.
if (isSource && (numRecords == null || numRecords.getSum() == 0)) {
var sourceTaskOut =
flinkMetrics.get(
perSecond
? FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
: FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT);
numRecords = sourceTaskOut != null ? sourceTaskOut : numRecords;
}
if (numRecords == null) {
LOG.debug("Received null input rate for {}. Returning NaN.", jobVertexID);
return Double.NaN;
}
return Math.max(0, numRecords.getSum());
}