in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java [141:209]
public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
AbstractFlinkResource<?, ?> resource,
Configuration conf,
SnapshotType snapshotType,
Instant lastTrigger) {
var status = resource.getStatus();
var jobStatus = status.getJobStatus();
var jobSpec = resource.getSpec().getJob();
if (!ReconciliationUtils.isJobRunning(status)) {
return Optional.empty();
}
var reconciledJobSpec =
status.getReconciliationStatus().deserializeLastReconciledSpec().getJob();
// Values that are specific to the snapshot type
Long triggerNonce;
Long reconciledTriggerNonce;
boolean inProgress;
String automaticTriggerExpression;
switch (snapshotType) {
case SAVEPOINT:
triggerNonce = jobSpec.getSavepointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
inProgress = savepointInProgress(jobStatus);
automaticTriggerExpression =
conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
break;
case CHECKPOINT:
triggerNonce = jobSpec.getCheckpointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
inProgress = checkpointInProgress(jobStatus);
automaticTriggerExpression =
conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
}
if (inProgress) {
return Optional.empty();
}
var triggerNonceChanged =
triggerNonce != null && !triggerNonce.equals(reconciledTriggerNonce);
if (triggerNonceChanged) {
if (snapshotType == CHECKPOINT && !isSnapshotTriggeringSupported(conf)) {
LOG.warn(
"Manual checkpoint triggering is attempted, but is not supported (requires Flink 1.17+)");
return Optional.empty();
} else {
return Optional.of(SnapshotTriggerType.MANUAL);
}
}
if (shouldTriggerAutomaticSnapshot(snapshotType, automaticTriggerExpression, lastTrigger)) {
if (snapshotType == CHECKPOINT && !isSnapshotTriggeringSupported(conf)) {
LOG.warn(
"Automatic checkpoints triggering is configured but is not supported (requires Flink 1.17+)");
return Optional.empty();
} else {
return Optional.of(SnapshotTriggerType.PERIODIC);
}
}
return Optional.empty();
}