in spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java [81:117]
private void patchAndStatusWithVersionLocked(CR resource, KubernetesClient client) {
ObjectNode newStatusNode = objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
ResourceID resourceId = ResourceID.fromResource(resource);
ObjectNode previousStatusNode = statusCache.get(resourceId);
if (newStatusNode.equals(previousStatusNode)) {
log.debug("No status change.");
return;
}
Exception err = null;
long maxRetry = API_STATUS_PATCH_MAX_ATTEMPTS.getValue();
for (long i = 0; i < maxRetry; i++) {
// We retry the status update 3 times to avoid some intermittent connectivity errors
try {
CR updated = client.resource(resource).lockResourceVersion().updateStatus();
resource.getMetadata().setResourceVersion(updated.getMetadata().getResourceVersion());
err = null;
} catch (KubernetesClientException e) {
log.warn("Error while patching status, retrying {}/{}...", i + 1, maxRetry, e);
Thread.sleep(TimeUnit.SECONDS.toMillis(API_RETRY_ATTEMPT_AFTER_SECONDS.getValue()));
err = e;
}
}
if (err != null) {
log.error("Fail to patch status.", err);
throw err;
}
statusCache.put(resourceId, newStatusNode);
STATUS prevStatus = objectMapper.convertValue(previousStatusNode, statusClass);
statusListeners.forEach(
listener -> {
listener.listenStatus(resource, prevStatus, resource.getStatus());
});
}