in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java [119:172]
private void replaceStatus(CR resource, STATUS prevStatus) throws JsonProcessingException {
int retries = 0;
while (true) {
try {
var updated = client.resource(resource).lockResourceVersion().updateStatus();
// If we successfully replaced the status, update the resource version so we know
// what to lock next in the same reconciliation loop
resource.getMetadata()
.setResourceVersion(updated.getMetadata().getResourceVersion());
return;
} catch (KubernetesClientException kce) {
// 409 is the error code for conflicts resulting from the locking
if (kce.getCode() == 409) {
var currentVersion = resource.getMetadata().getResourceVersion();
LOG.debug(
"Could not apply status update for resource version {}",
currentVersion);
var latest = client.resource(resource).get();
var latestVersion = latest.getMetadata().getResourceVersion();
if (latestVersion.equals(currentVersion)) {
// This should not happen as long as the client works consistently
LOG.error("Unable to fetch latest resource version");
throw kce;
}
if (latest.getStatus().equals(prevStatus)) {
if (retries++ < 3) {
LOG.debug(
"Retrying status update for latest version {}", latestVersion);
resource.getMetadata().setResourceVersion(latestVersion);
} else {
// If we cannot get the latest version in 3 tries we throw the error to
// retry with delay
throw kce;
}
} else {
throw new StatusConflictException(
"Status have been modified externally in version "
+ latestVersion
+ " Previous: "
+ objectMapper.writeValueAsString(prevStatus)
+ " Latest: "
+ objectMapper.writeValueAsString(latest.getStatus()));
}
} else {
// We simply throw non conflict errors, to trigger retry with delay
throw kce;
}
}
}
}