protected Map queryAggregatedVertexMetrics()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java [62:109]


    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(
            FlinkService flinkService,
            AbstractFlinkResource<?, ?> cr,
            Configuration conf,
            JobVertexID jobVertexID,
            Map<String, FlinkMetric> metrics) {

        LOG.debug("Querying metrics {} for {}", metrics, jobVertexID);

        var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());

        var parameters = new AggregatedSubtaskMetricsParameters();
        var pathIt = parameters.getPathParameters().iterator();

        ((JobIDPathParameter) pathIt.next()).resolve(jobId);
        ((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID);

        parameters
                .getQueryParameters()
                .iterator()
                .next()
                .resolveFromString(StringUtils.join(metrics.keySet(), ","));

        try (var restClient = flinkService.getClusterClient(conf)) {

            var responseBody =
                    restClient
                            .sendRequest(
                                    AggregatedSubtaskMetricsHeaders.getInstance(),
                                    parameters,
                                    EmptyRequestBody.getInstance())
                            .get();

            return responseBody.getMetrics().stream()
                    .collect(
                            Collectors.toMap(
                                    m -> metrics.get(m.getId()),
                                    m -> m,
                                    (m1, m2) ->
                                            new AggregatedMetric(
                                                    m1.getId() + " merged with " + m2.getId(),
                                                    Math.min(m1.getMin(), m2.getMin()),
                                                    Math.max(m1.getMax(), m2.getMax()),
                                                    // Average can't be computed
                                                    Double.NaN,
                                                    m1.getSum() + m2.getSum())));
        }
    }