in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java [62:109]
protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(
FlinkService flinkService,
AbstractFlinkResource<?, ?> cr,
Configuration conf,
JobVertexID jobVertexID,
Map<String, FlinkMetric> metrics) {
LOG.debug("Querying metrics {} for {}", metrics, jobVertexID);
var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
var parameters = new AggregatedSubtaskMetricsParameters();
var pathIt = parameters.getPathParameters().iterator();
((JobIDPathParameter) pathIt.next()).resolve(jobId);
((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID);
parameters
.getQueryParameters()
.iterator()
.next()
.resolveFromString(StringUtils.join(metrics.keySet(), ","));
try (var restClient = flinkService.getClusterClient(conf)) {
var responseBody =
restClient
.sendRequest(
AggregatedSubtaskMetricsHeaders.getInstance(),
parameters,
EmptyRequestBody.getInstance())
.get();
return responseBody.getMetrics().stream()
.collect(
Collectors.toMap(
m -> metrics.get(m.getId()),
m -> m,
(m1, m2) ->
new AggregatedMetric(
m1.getId() + " merged with " + m2.getId(),
Math.min(m1.getMin(), m2.getMin()),
Math.max(m1.getMax(), m2.getMax()),
// Average can't be computed
Double.NaN,
m1.getSum() + m2.getSum())));
}
}