in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [113:174]
private void observeTriggeredSavepoint(FlinkResourceContext<CR> ctx, String jobID) {
var resource = (AbstractFlinkResource<?, ?>) ctx.getResource();
var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();
LOG.debug("Observing in-progress savepoint");
var savepointFetchResult =
ctx.getFlinkService()
.fetchSavepointInfo(
savepointInfo.getTriggerId(), jobID, ctx.getObserveConfig());
if (savepointFetchResult.isPending()) {
LOG.info("Savepoint operation not finished yet...");
return;
}
if (savepointFetchResult.getError() != null) {
var err = savepointFetchResult.getError();
Duration gracePeriod =
ctx.getObserveConfig()
.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD);
if (SnapshotUtils.gracePeriodEnded(gracePeriod, savepointInfo)) {
LOG.error(
"Savepoint attempt failed after grace period. Won't be retried again: "
+ err);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
savepointInfo.getTriggerType(),
(AbstractFlinkResource) resource,
SAVEPOINT);
} else {
LOG.warn("Savepoint failed within grace period, retrying: " + err);
}
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Warning,
EventRecorder.Reason.SavepointError,
EventRecorder.Component.Operator,
savepointInfo.formatErrorMessage(
resource.getSpec().getJob().getSavepointTriggerNonce()),
ctx.getKubernetesClient());
savepointInfo.resetTrigger();
return;
}
var savepoint =
new Savepoint(
savepointInfo.getTriggerTimestamp(),
savepointFetchResult.getLocation(),
savepointInfo.getTriggerType(),
savepointInfo.getFormatType(),
SnapshotTriggerType.MANUAL == savepointInfo.getTriggerType()
? resource.getSpec().getJob().getSavepointTriggerNonce()
: null);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
savepointInfo.getTriggerType(), resource, SAVEPOINT);
// In case of periodic and manual savepoint, we still use lastSavepoint
savepointInfo.updateLastSavepoint(savepoint);
}