public void registerScalingMetrics()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java [64:110]


    public void registerScalingMetrics(
            Supplier<Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
                    currentVertexMetrics) {
        currentVertexMetrics
                .get()
                .forEach(
                        (jobVertexID, evaluated) -> {
                            if (!vertexMetrics.add(jobVertexID)) {
                                return;
                            }
                            LOG.info("Registering scaling metrics for job vertex {}", jobVertexID);
                            var jobVertexMg =
                                    metricGroup.addGroup(JOB_VERTEX_ID, jobVertexID.toHexString());

                            evaluated.forEach(
                                    (sm, esm) -> {
                                        var smGroup = jobVertexMg.addGroup(sm.name());

                                        smGroup.gauge(
                                                CURRENT,
                                                () ->
                                                        Optional.ofNullable(
                                                                        currentVertexMetrics.get())
                                                                .map(m -> m.get(jobVertexID))
                                                                .map(metrics -> metrics.get(sm))
                                                                .map(
                                                                        EvaluatedScalingMetric
                                                                                ::getCurrent)
                                                                .orElse(Double.NaN));

                                        if (sm.isCalculateAverage()) {
                                            smGroup.gauge(
                                                    AVERAGE,
                                                    () ->
                                                            Optional.ofNullable(
                                                                            currentVertexMetrics
                                                                                    .get())
                                                                    .map(m -> m.get(jobVertexID))
                                                                    .map(metrics -> metrics.get(sm))
                                                                    .map(
                                                                            EvaluatedScalingMetric
                                                                                    ::getAverage)
                                                                    .orElse(Double.NaN));
                                        }
                                    });
                        });
    }