in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [655:705]
public CheckpointFetchResult fetchCheckpointInfo(
String triggerId, String jobId, Configuration conf) {
LOG.info("Fetching checkpoint result with triggerId: " + triggerId);
try (RestClusterClient<String> clusterClient = getClusterClient(conf)) {
CheckpointStatusHeaders checkpointStatusHeaders = CheckpointStatusHeaders.getInstance();
CheckpointStatusMessageParameters checkpointStatusMessageParameters =
checkpointStatusHeaders.getUnresolvedMessageParameters();
checkpointStatusMessageParameters.jobIdPathParameter.resolve(
JobID.fromHexString(jobId));
checkpointStatusMessageParameters.triggerIdPathParameter.resolve(
TriggerId.fromHexString(triggerId));
CompletableFuture<AsynchronousOperationResult<CheckpointInfo>> response =
clusterClient.sendRequest(
checkpointStatusHeaders,
checkpointStatusMessageParameters,
EmptyRequestBody.getInstance());
if (response.get() == null || response.get().resource() == null) {
return CheckpointFetchResult.pending();
}
if (response.get().resource().getFailureCause() != null) {
LOG.error(
"Failure occurred while fetching the checkpoint result",
response.get().resource().getFailureCause());
return CheckpointFetchResult.error(
response.get().resource().getFailureCause().toString());
}
QueueStatus.Id operationStatus = response.get().queueStatus().getId();
switch (operationStatus) {
case IN_PROGRESS:
return CheckpointFetchResult.pending();
case COMPLETED:
LOG.info(
"Checkpoint {} triggered by the operator for job {} completed:",
triggerId,
jobId);
return CheckpointFetchResult.completed(
response.get().resource().getCheckpointId());
default:
throw new IllegalStateException(
String.format(
"Checkpoint %s for job %s is reported to be in an unknown status: %s",
triggerId, jobId, operationStatus.name()));
}
} catch (Exception e) {
LOG.error("Exception while fetching the checkpoint result", e);
return CheckpointFetchResult.error(e.getMessage());
}
}