public void onUpdate()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java [71:162]


    public void onUpdate(FlinkDeployment flinkApp) {
        onRemove(flinkApp);

        var namespace = flinkApp.getMetadata().getNamespace();
        var clusterInfo = flinkApp.getStatus().getClusterInfo();
        var deploymentName = flinkApp.getMetadata().getName();

        deploymentStatuses
                .computeIfAbsent(
                        namespace,
                        ns -> {
                            initNamespaceDeploymentCounts(ns);
                            initNamespaceStatusCounts(ns);
                            return createDeploymentStatusMap();
                        })
                .get(flinkApp.getStatus().getJobManagerDeploymentStatus())
                .add(deploymentName);

        // Full runtime version queried from the JobManager REST API
        var flinkVersion =
                flinkApp.getStatus()
                        .getClusterInfo()
                        .getOrDefault(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "");
        if (StringUtils.isNullOrWhitespaceOnly(flinkVersion)) {
            flinkVersion = UNKNOWN_VERSION;
        }
        deploymentFlinkVersions
                .computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
                .computeIfAbsent(
                        flinkVersion,
                        v -> {
                            initFlinkVersions(namespace, v);
                            return ConcurrentHashMap.newKeySet();
                        })
                .add(deploymentName);

        // Minor version computed from the above
        var subVersions = flinkVersion.split("\\.");
        String minorVersion = MALFORMED_MINOR_VERSION;
        if (subVersions.length >= 2) {
            minorVersion = subVersions[0].concat(".").concat(subVersions[1]);
        }
        deploymentFlinkMinorVersions
                .computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
                .computeIfAbsent(
                        minorVersion,
                        v -> {
                            initFlinkMinorVersions(namespace, v);
                            return ConcurrentHashMap.newKeySet();
                        })
                .add(deploymentName);

        var totalCpu =
                NumberUtils.toDouble(
                        clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"));
        if (!Double.isFinite(totalCpu)) {
            totalCpu = 0;
        }
        deploymentCpuUsage
                .computeIfAbsent(
                        namespace,
                        ns -> {
                            initNamespaceCpuUsage(ns);
                            return new ConcurrentHashMap<>();
                        })
                .put(deploymentName, totalCpu);

        deploymentMemoryUsage
                .computeIfAbsent(
                        namespace,
                        ns -> {
                            initNamespaceMemoryUsage(ns);
                            return new ConcurrentHashMap<>();
                        })
                .put(
                        deploymentName,
                        NumberUtils.toLong(
                                clusterInfo.getOrDefault(
                                        AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));

        long stateSize =
                NumberUtils.toLong(
                        clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "0"));
        deploymentStateSize
                .computeIfAbsent(
                        namespace,
                        ns -> {
                            initNamespaceStateSize(ns);
                            return new ConcurrentHashMap<>();
                        })
                .put(deploymentName, stateSize > 0 ? stateSize : 0);
    }