in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/snapshot/StateSnapshotObserver.java [115:170]
private void handleCheckpoint(
FlinkStateSnapshotContext ctx,
CheckpointFetchResult checkpointInfo,
FlinkResourceContext<FlinkDeployment> ctxFlinkDeployment,
String jobId) {
var resource = ctx.getResource();
var resourceName = resource.getMetadata().getName();
if (checkpointInfo.isPending()) {
LOG.debug(
"Checkpoint for {} with ID {} is pending",
resourceName,
resource.getStatus().getTriggerId());
return;
}
if (checkpointInfo.getError() != null) {
throw new ReconciliationException(checkpointInfo.getError());
}
LOG.debug("Checkpoint {} was successful, querying final checkpoint path...", resourceName);
var checkpointStatsResult =
ctxFlinkDeployment
.getFlinkService()
.fetchCheckpointStats(
jobId,
checkpointInfo.getCheckpointId(),
ctx.getReferencedJobObserveConfig());
if (checkpointStatsResult.isPending()) {
return;
}
String path = checkpointStatsResult.getPath();
// At this point the checkpoint is already assumed to be complete, so we can mark the
// snapshot complete with empty path and trigger an event.
if (checkpointStatsResult.getError() != null) {
path = "";
var error =
String.format(
"Checkpoint %s was successful, but failed to fetch path. Flink webserver stores only a limited amount of checkpoints in its cache, try increasing '%s' config for this job.\n%s",
resourceName,
CHECKPOINTS_HISTORY_SIZE.key(),
checkpointStatsResult.getError());
eventRecorder.triggerSnapshotEvent(
resource,
EventRecorder.Type.Warning,
EventRecorder.Reason.CheckpointError,
EventRecorder.Component.Snapshot,
error,
ctx.getKubernetesClient());
}
LOG.info("Checkpoint {} successful: {}", resourceName, path);
FlinkStateSnapshotUtils.snapshotSuccessful(resource, path, false);
}