in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java [386:438]
private boolean shouldRollBack(FlinkResourceContext<CR> ctx, Configuration configuration) {
var resource = ctx.getResource();
var reconciliationStatus = resource.getStatus().getReconciliationStatus();
if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
return true;
}
if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
|| reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK
|| reconciliationStatus.isLastReconciledSpecStable()) {
return false;
}
var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
if (lastStableSpec == null) {
// Nothing to roll back to yet
return false;
}
if (lastStableSpec.getJob() != null
&& lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
// Should not roll back to suspended state
return false;
}
if (flinkVersionChanged(resource.getSpec(), lastStableSpec)) {
// Should not roll back Flink version changes
return false;
}
Duration readinessTimeout =
configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
if (!clock.instant()
.minus(readinessTimeout)
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) {
return false;
}
if (resource.getSpec().getJob() != null
&& resource.getSpec().getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT
&& FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
// HA data not available as JM never start and relying on SAVEPOINT upgrade mode
// Safe to rollback relying on savepoint
return true;
}
var haDataAvailable = ctx.getFlinkService().isHaMetadataAvailable(configuration);
if (!haDataAvailable) {
LOG.warn("Rollback is not possible due to missing HA metadata");
}
return haDataAvailable;
}