in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [97:153]
private void observeTriggeredSavepoint(FlinkResourceContext<CR> ctx, String jobID) {
var resource = (AbstractFlinkResource<?, ?>) ctx.getResource();
var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();
LOG.info("Observing savepoint status.");
var savepointFetchResult =
ctx.getFlinkService()
.fetchSavepointInfo(
savepointInfo.getTriggerId(), jobID, ctx.getObserveConfig());
if (savepointFetchResult.isPending()) {
LOG.info("Savepoint operation not finished yet...");
return;
}
if (savepointFetchResult.getError() != null) {
var err = savepointFetchResult.getError();
Duration gracePeriod =
ctx.getObserveConfig()
.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD);
if (SnapshotUtils.gracePeriodEnded(gracePeriod, savepointInfo)) {
LOG.error(
"Savepoint attempt failed after grace period. Won't be retried again: "
+ err);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
savepointInfo, (AbstractFlinkResource) resource, SAVEPOINT);
} else {
LOG.warn("Savepoint failed within grace period, retrying: " + err);
}
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Warning,
EventRecorder.Reason.SavepointError,
EventRecorder.Component.Operator,
savepointInfo.formatErrorMessage(
resource.getSpec().getJob().getSavepointTriggerNonce()));
savepointInfo.resetTrigger();
return;
}
var savepoint =
new Savepoint(
savepointInfo.getTriggerTimestamp(),
savepointFetchResult.getLocation(),
savepointInfo.getTriggerType(),
savepointInfo.getFormatType(),
SnapshotTriggerType.MANUAL == savepointInfo.getTriggerType()
? resource.getSpec().getJob().getSavepointTriggerNonce()
: null);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
savepointInfo, resource, SAVEPOINT);
savepointInfo.updateLastSavepoint(savepoint);
}