in spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/healthcheck/SentinelManager.java [154:186]
private void updateSpecAndScheduleHealthCheck(
ResourceID resourceID, SentinelResourceState sentinelResourceState, KubernetesClient client) {
Map<String, String> sparkConf = sentinelResourceState.resource.getSpec().getSparkConf();
sparkConf.compute(
Constants.SENTINEL_RESOURCE_DUMMY_FIELD,
(key, value) -> {
if (value == null) {
return "1";
} else {
return String.valueOf(Long.parseLong(value) + 1);
}
});
sentinelResourceState.previousGeneration =
sentinelResourceState.resource.getMetadata().getGeneration();
try {
if (log.isDebugEnabled()) {
log.debug("Update the sentinel kubernetes resource spec {}", sentinelResourceState);
}
client.resource(ReconcilerUtils.clone(sentinelResourceState.resource)).replace();
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn(
"Could not replace the sentinel deployment spark conf {}",
Constants.SENTINEL_RESOURCE_DUMMY_FIELD,
t);
}
}
int delay = SparkOperatorConf.SENTINEL_RESOURCE_RECONCILIATION_DELAY.getValue();
if (log.isInfoEnabled()) {
log.info("Scheduling sentinel check for {} in {} seconds", resourceID, delay);
}
executorService.schedule(() -> checkHealth(resourceID, client), delay, TimeUnit.SECONDS);
}