in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java [60:119]
public static SnapshotStatus getLastSnapshotStatus(
AbstractFlinkResource<?, ?> resource, SnapshotType snapshotType) {
var status = resource.getStatus();
var jobStatus = status.getJobStatus();
var jobSpec = resource.getSpec().getJob();
var reconciledJobSpec =
status.getReconciliationStatus().deserializeLastReconciledSpec().getJob();
// Values that are specific to the snapshot type
Long triggerNonce;
Long reconciledTriggerNonce;
SnapshotInfo snapshotInfo;
switch (snapshotType) {
case SAVEPOINT:
triggerNonce = jobSpec.getSavepointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
snapshotInfo = jobStatus.getSavepointInfo();
break;
case CHECKPOINT:
triggerNonce = jobSpec.getCheckpointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
snapshotInfo = jobStatus.getCheckpointInfo();
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
}
if (snapshotInfo.getTriggerId() != null) {
return SnapshotStatus.PENDING;
}
// if triggerNonce is cleared, the snapshot is not triggered.
// For manual snapshots, we report pending status
// during retries while the triggerId gets reset between retries.
if (triggerNonce != null && !Objects.equals(triggerNonce, reconciledTriggerNonce)) {
return SnapshotStatus.PENDING;
}
Long lastTriggerNonce = snapshotInfo.getLastTriggerNonce();
SnapshotTriggerType lastSnapshotTriggerType = snapshotInfo.getLastTriggerType();
if (lastSnapshotTriggerType == null) {
// Indicates that no snapshot of snapshotType was ever taken
return null;
}
// Last snapshot was manual and triggerNonce matches
if (Objects.equals(reconciledTriggerNonce, lastTriggerNonce)) {
return SnapshotStatus.SUCCEEDED;
}
// Last snapshot was not manual
if (lastSnapshotTriggerType != SnapshotTriggerType.MANUAL) {
return SnapshotStatus.SUCCEEDED;
}
return SnapshotStatus.ABANDONED;
}