in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java [49:99]
public void observe(FlinkResourceContext<FlinkDeployment> ctx) {
var flinkApp = ctx.getResource();
try {
LOG.debug("Observing cluster health");
var deploymentStatus = flinkApp.getStatus();
var jobStatus = deploymentStatus.getJobStatus();
var jobId = jobStatus.getJobId();
var metrics =
ctx.getFlinkService()
.getMetrics(
ctx.getObserveConfig(),
jobId,
List.of(
FULL_RESTARTS_METRIC_NAME,
NUM_RESTARTS_METRIC_NAME,
NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME));
ClusterHealthInfo observedClusterHealthInfo = new ClusterHealthInfo();
if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
LOG.debug(NUM_RESTARTS_METRIC_NAME + " metric is used");
observedClusterHealthInfo.setNumRestarts(
Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
} else if (metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
LOG.debug(
FULL_RESTARTS_METRIC_NAME
+ " metric is used because "
+ NUM_RESTARTS_METRIC_NAME
+ " is missing");
observedClusterHealthInfo.setNumRestarts(
Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
} else {
throw new IllegalStateException(
"No job restart metric found. Either "
+ FULL_RESTARTS_METRIC_NAME
+ "(old and deprecated in never Flink versions) or "
+ NUM_RESTARTS_METRIC_NAME
+ "(new) must exist.");
}
observedClusterHealthInfo.setNumCompletedCheckpoints(
Integer.parseInt(metrics.get(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME)));
LOG.debug("Observed cluster health: {}", observedClusterHealthInfo);
clusterHealthEvaluator.evaluate(
ctx.getObserveConfig(),
deploymentStatus.getClusterInfo(),
observedClusterHealthInfo);
} catch (Exception e) {
LOG.warn("Exception while observing cluster health: {}", e.getMessage());
// Intentionally not throwing exception since we handle fetch metrics failure as
// temporary issue
}
}