private void replaceStatus()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java [119:172]


    private void replaceStatus(CR resource, STATUS prevStatus) throws JsonProcessingException {
        int retries = 0;
        while (true) {
            try {
                var updated = client.resource(resource).lockResourceVersion().updateStatus();

                // If we successfully replaced the status, update the resource version so we know
                // what to lock next in the same reconciliation loop
                resource.getMetadata()
                        .setResourceVersion(updated.getMetadata().getResourceVersion());
                return;
            } catch (KubernetesClientException kce) {
                // 409 is the error code for conflicts resulting from the locking
                if (kce.getCode() == 409) {
                    var currentVersion = resource.getMetadata().getResourceVersion();
                    LOG.debug(
                            "Could not apply status update for resource version {}",
                            currentVersion);

                    var latest = client.resource(resource).get();
                    var latestVersion = latest.getMetadata().getResourceVersion();

                    if (latestVersion.equals(currentVersion)) {
                        // This should not happen as long as the client works consistently
                        LOG.error("Unable to fetch latest resource version");
                        throw kce;
                    }

                    if (latest.getStatus().equals(prevStatus)) {
                        if (retries++ < 3) {
                            LOG.debug(
                                    "Retrying status update for latest version {}", latestVersion);
                            resource.getMetadata().setResourceVersion(latestVersion);
                        } else {
                            // If we cannot get the latest version in 3 tries we throw the error to
                            // retry with delay
                            throw kce;
                        }
                    } else {
                        throw new StatusConflictException(
                                "Status have been modified externally in version "
                                        + latestVersion
                                        + " Previous: "
                                        + objectMapper.writeValueAsString(prevStatus)
                                        + " Latest: "
                                        + objectMapper.writeValueAsString(latest.getStatus()));
                    }
                } else {
                    // We simply throw non conflict errors, to trigger retry with delay
                    throw kce;
                }
            }
        }
    }