in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java [62:124]
public void onUpdate(FlinkDeployment flinkApp) {
onRemove(flinkApp);
var namespace = flinkApp.getMetadata().getNamespace();
var clusterInfo = flinkApp.getStatus().getClusterInfo();
var deploymentName = flinkApp.getMetadata().getName();
deploymentStatuses
.computeIfAbsent(
namespace,
ns -> {
initNamespaceDeploymentCounts(ns);
initNamespaceStatusCounts(ns);
return createDeploymentStatusMap();
})
.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
.add(deploymentName);
var flinkVersion =
flinkApp.getStatus()
.getClusterInfo()
.getOrDefault(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "");
if (StringUtils.isNullOrWhitespaceOnly(flinkVersion)) {
flinkVersion = "UNKNOWN";
}
deploymentFlinkVersions
.computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
.computeIfAbsent(
flinkVersion,
v -> {
initFlinkVersions(namespace, v);
return ConcurrentHashMap.newKeySet();
})
.add(deploymentName);
var totalCpu =
NumberUtils.toDouble(
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"));
if (!Double.isFinite(totalCpu)) {
totalCpu = 0;
}
deploymentCpuUsage
.computeIfAbsent(
namespace,
ns -> {
initNamespaceCpuUsage(ns);
return new ConcurrentHashMap<>();
})
.put(deploymentName, totalCpu);
deploymentMemoryUsage
.computeIfAbsent(
namespace,
ns -> {
initNamespaceMemoryUsage(ns);
return new ConcurrentHashMap<>();
})
.put(
deploymentName,
NumberUtils.toLong(
clusterInfo.getOrDefault(
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));
}