public void observe()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java [49:99]


    public void observe(FlinkResourceContext<FlinkDeployment> ctx) {
        var flinkApp = ctx.getResource();
        try {
            LOG.debug("Observing cluster health");
            var deploymentStatus = flinkApp.getStatus();
            var jobStatus = deploymentStatus.getJobStatus();
            var jobId = jobStatus.getJobId();
            var metrics =
                    ctx.getFlinkService()
                            .getMetrics(
                                    ctx.getObserveConfig(),
                                    jobId,
                                    List.of(
                                            FULL_RESTARTS_METRIC_NAME,
                                            NUM_RESTARTS_METRIC_NAME,
                                            NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME));
            ClusterHealthInfo observedClusterHealthInfo = new ClusterHealthInfo();
            if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
                LOG.debug(NUM_RESTARTS_METRIC_NAME + " metric is used");
                observedClusterHealthInfo.setNumRestarts(
                        Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
            } else if (metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
                LOG.debug(
                        FULL_RESTARTS_METRIC_NAME
                                + " metric is used because "
                                + NUM_RESTARTS_METRIC_NAME
                                + " is missing");
                observedClusterHealthInfo.setNumRestarts(
                        Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
            } else {
                throw new IllegalStateException(
                        "No job restart metric found. Either "
                                + FULL_RESTARTS_METRIC_NAME
                                + "(old and deprecated in never Flink versions) or "
                                + NUM_RESTARTS_METRIC_NAME
                                + "(new) must exist.");
            }
            observedClusterHealthInfo.setNumCompletedCheckpoints(
                    Integer.parseInt(metrics.get(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME)));
            LOG.debug("Observed cluster health: {}", observedClusterHealthInfo);

            clusterHealthEvaluator.evaluate(
                    ctx.getObserveConfig(),
                    deploymentStatus.getClusterInfo(),
                    observedClusterHealthInfo);
        } catch (Exception e) {
            LOG.warn("Exception while observing cluster health: {}", e.getMessage());
            // Intentionally not throwing exception since we handle fetch metrics failure as
            // temporary issue
        }
    }