in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [468:559]
private boolean triggerSnapshotIfNeeded(FlinkResourceContext<CR> ctx, SnapshotType snapshotType)
throws Exception {
var resource = ctx.getResource();
var conf = ctx.getObserveConfig();
var lastTrigger =
snapshotTriggerTimestampStore.getLastPeriodicTriggerInstant(
resource,
snapshotType,
FlinkStateSnapshotUtils.getFlinkStateSnapshotsSupplier(ctx));
var triggerOpt =
SnapshotUtils.shouldTriggerSnapshot(resource, conf, snapshotType, lastTrigger);
if (triggerOpt.isEmpty()) {
return false;
}
var triggerType = triggerOpt.get();
if (SnapshotTriggerType.PERIODIC.equals(triggerType)) {
snapshotTriggerTimestampStore.updateLastPeriodicTriggerTimestamp(
resource, snapshotType, Instant.now());
}
var createSnapshotResource =
FlinkStateSnapshotUtils.isSnapshotResourceEnabled(ctx.getOperatorConfig(), conf);
String jobId = resource.getStatus().getJobStatus().getJobId();
switch (snapshotType) {
case SAVEPOINT:
var savepointFormatType =
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
var savepointDirectory =
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
if (createSnapshotResource) {
FlinkStateSnapshotUtils.createSavepointResource(
ctx.getKubernetesClient(),
resource,
savepointDirectory,
triggerType,
SavepointFormatType.valueOf(savepointFormatType.name()),
conf.getBoolean(
KubernetesOperatorConfigOptions
.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE));
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
triggerType, resource, SAVEPOINT);
} else {
var triggerId =
ctx.getFlinkService()
.triggerSavepoint(
jobId, savepointFormatType, savepointDirectory, conf);
resource.getStatus()
.getJobStatus()
.getSavepointInfo()
.setTrigger(
triggerId,
triggerType,
SavepointFormatType.valueOf(savepointFormatType.name()));
}
break;
case CHECKPOINT:
if (createSnapshotResource) {
FlinkStateSnapshotUtils.createCheckpointResource(
ctx.getKubernetesClient(), resource, triggerType);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
triggerType, resource, CHECKPOINT);
} else {
var checkpointType =
conf.get(KubernetesOperatorConfigOptions.OPERATOR_CHECKPOINT_TYPE);
var triggerId =
ctx.getFlinkService()
.triggerCheckpoint(
jobId,
org.apache.flink.core.execution.CheckpointType.valueOf(
checkpointType.name()),
conf);
resource.getStatus()
.getJobStatus()
.getCheckpointInfo()
.setTrigger(triggerId, triggerType, checkpointType);
}
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
}
return true;
}