in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java [178:256]
protected static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
AbstractFlinkResource<?, ?> resource, Configuration conf, SnapshotType snapshotType) {
var status = resource.getStatus();
var jobStatus = status.getJobStatus();
var jobSpec = resource.getSpec().getJob();
if (!ReconciliationUtils.isJobRunning(status)) {
return Optional.empty();
}
var reconciledJobSpec =
status.getReconciliationStatus().deserializeLastReconciledSpec().getJob();
// Values that are specific to the snapshot type
Long triggerNonce;
Long reconciledTriggerNonce;
boolean inProgress;
SnapshotInfo snapshotInfo;
Duration interval;
switch (snapshotType) {
case SAVEPOINT:
triggerNonce = jobSpec.getSavepointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
inProgress = savepointInProgress(jobStatus);
snapshotInfo = jobStatus.getSavepointInfo();
interval = conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
break;
case CHECKPOINT:
triggerNonce = jobSpec.getCheckpointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
inProgress = checkpointInProgress(jobStatus);
snapshotInfo = jobStatus.getCheckpointInfo();
interval = conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
if (!isCheckpointsTriggeringSupported(conf)) {
LOG.warn(
"Periodic checkpoints triggering every {} is configured, "
+ "but not supported (requires Flink 1.17+)",
interval);
return Optional.empty();
}
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
}
if (inProgress) {
return Optional.empty();
}
var triggerNonceChanged =
triggerNonce != null && !triggerNonce.equals(reconciledTriggerNonce);
if (triggerNonceChanged) {
return Optional.of(SnapshotTriggerType.MANUAL);
}
if (interval.isZero()) {
return Optional.empty();
}
var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
// When the resource is first created/periodic snapshotting enabled we have to compare
// against the creation timestamp for triggering the first periodic savepoint
var lastTrigger =
lastTriggerTs == 0
? Instant.parse(resource.getMetadata().getCreationTimestamp())
: Instant.ofEpochMilli(lastTriggerTs);
var now = Instant.now();
if (lastTrigger.plus(interval).isBefore(Instant.now())) {
LOG.info(
"Triggering new periodic {} after {}",
snapshotType.toString().toLowerCase(),
Duration.between(lastTrigger, now));
return Optional.of(SnapshotTriggerType.PERIODIC);
}
return Optional.empty();
}