in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [155:210]
private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String jobID) {
var resource = (AbstractFlinkResource<?, ?>) ctx.getResource();
CheckpointInfo checkpointInfo = resource.getStatus().getJobStatus().getCheckpointInfo();
LOG.info("Observing checkpoint status.");
var checkpointFetchResult =
ctx.getFlinkService()
.fetchCheckpointInfo(
checkpointInfo.getTriggerId(), jobID, ctx.getObserveConfig());
if (checkpointFetchResult.isPending()) {
LOG.info("Checkpoint operation not finished yet...");
return;
}
if (checkpointFetchResult.getError() != null) {
var err = checkpointFetchResult.getError();
Duration gracePeriod =
ctx.getObserveConfig()
.get(
KubernetesOperatorConfigOptions
.OPERATOR_CHECKPOINT_TRIGGER_GRACE_PERIOD);
if (SnapshotUtils.gracePeriodEnded(gracePeriod, checkpointInfo)) {
LOG.error(
"Checkpoint attempt failed after grace period. Won't be retried again: "
+ err);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
checkpointInfo, (AbstractFlinkResource) resource, CHECKPOINT);
} else {
LOG.warn("Checkpoint failed within grace period, retrying: " + err);
}
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Warning,
EventRecorder.Reason.CheckpointError,
EventRecorder.Component.Operator,
checkpointInfo.formatErrorMessage(
resource.getSpec().getJob().getCheckpointTriggerNonce()));
checkpointInfo.resetTrigger();
return;
}
var checkpoint =
new Checkpoint(
checkpointInfo.getTriggerTimestamp(),
checkpointInfo.getTriggerType(),
checkpointInfo.getFormatType(),
SnapshotTriggerType.MANUAL == checkpointInfo.getTriggerType()
? resource.getSpec().getJob().getCheckpointTriggerNonce()
: null);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
checkpointInfo, resource, CHECKPOINT);
checkpointInfo.updateLastCheckpoint(checkpoint);
}