in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java [64:110]
public void registerScalingMetrics(
Supplier<Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
currentVertexMetrics) {
currentVertexMetrics
.get()
.forEach(
(jobVertexID, evaluated) -> {
if (!vertexMetrics.add(jobVertexID)) {
return;
}
LOG.info("Registering scaling metrics for job vertex {}", jobVertexID);
var jobVertexMg =
metricGroup.addGroup(JOB_VERTEX_ID, jobVertexID.toHexString());
evaluated.forEach(
(sm, esm) -> {
var smGroup = jobVertexMg.addGroup(sm.name());
smGroup.gauge(
CURRENT,
() ->
Optional.ofNullable(
currentVertexMetrics.get())
.map(m -> m.get(jobVertexID))
.map(metrics -> metrics.get(sm))
.map(
EvaluatedScalingMetric
::getCurrent)
.orElse(Double.NaN));
if (sm.isCalculateAverage()) {
smGroup.gauge(
AVERAGE,
() ->
Optional.ofNullable(
currentVertexMetrics
.get())
.map(m -> m.get(jobVertexID))
.map(metrics -> metrics.get(sm))
.map(
EvaluatedScalingMetric
::getAverage)
.orElse(Double.NaN));
}
});
});
}