in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java [71:162]
public void onUpdate(FlinkDeployment flinkApp) {
onRemove(flinkApp);
var namespace = flinkApp.getMetadata().getNamespace();
var clusterInfo = flinkApp.getStatus().getClusterInfo();
var deploymentName = flinkApp.getMetadata().getName();
deploymentStatuses
.computeIfAbsent(
namespace,
ns -> {
initNamespaceDeploymentCounts(ns);
initNamespaceStatusCounts(ns);
return createDeploymentStatusMap();
})
.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
.add(deploymentName);
// Full runtime version queried from the JobManager REST API
var flinkVersion =
flinkApp.getStatus()
.getClusterInfo()
.getOrDefault(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "");
if (StringUtils.isNullOrWhitespaceOnly(flinkVersion)) {
flinkVersion = UNKNOWN_VERSION;
}
deploymentFlinkVersions
.computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
.computeIfAbsent(
flinkVersion,
v -> {
initFlinkVersions(namespace, v);
return ConcurrentHashMap.newKeySet();
})
.add(deploymentName);
// Minor version computed from the above
var subVersions = flinkVersion.split("\\.");
String minorVersion = MALFORMED_MINOR_VERSION;
if (subVersions.length >= 2) {
minorVersion = subVersions[0].concat(".").concat(subVersions[1]);
}
deploymentFlinkMinorVersions
.computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
.computeIfAbsent(
minorVersion,
v -> {
initFlinkMinorVersions(namespace, v);
return ConcurrentHashMap.newKeySet();
})
.add(deploymentName);
var totalCpu =
NumberUtils.toDouble(
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"));
if (!Double.isFinite(totalCpu)) {
totalCpu = 0;
}
deploymentCpuUsage
.computeIfAbsent(
namespace,
ns -> {
initNamespaceCpuUsage(ns);
return new ConcurrentHashMap<>();
})
.put(deploymentName, totalCpu);
deploymentMemoryUsage
.computeIfAbsent(
namespace,
ns -> {
initNamespaceMemoryUsage(ns);
return new ConcurrentHashMap<>();
})
.put(
deploymentName,
NumberUtils.toLong(
clusterInfo.getOrDefault(
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));
long stateSize =
NumberUtils.toLong(
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "0"));
deploymentStateSize
.computeIfAbsent(
namespace,
ns -> {
initNamespaceStateSize(ns);
return new ConcurrentHashMap<>();
})
.put(deploymentName, stateSize > 0 ? stateSize : 0);
}