in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java [267:406]
public void cancel(FlinkApplication appParam) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.CANCEL);
return;
}
FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.CANCELLING);
FlinkApplication application = getById(appParam.getId());
application.setState(FlinkAppStateEnum.CANCELLING.getValue());
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setJobType(EngineTypeEnum.FLINK.getCode());
applicationLog.setOptionName(OperationEnum.CANCEL.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setTrackingUrl(application.getJobManagerUrl());
applicationLog.setCreateTime(new Date());
applicationLog.setClusterId(application.getClusterId());
applicationLog.setUserId(ServiceHelper.getUserId());
if (appParam.getRestoreOrTriggerSavepoint()) {
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionStateEnum.CANCELLING.getValue());
}
application.setOptionTime(new Date());
this.baseMapper.updateById(application);
Long userId = ServiceHelper.getUserId();
if (!application.getUserId().equals(userId)) {
FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
}
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
// infer savepoint
String customSavepoint = null;
if (appParam.getRestoreOrTriggerSavepoint()) {
customSavepoint = appParam.getSavepointPath();
if (StringUtils.isBlank(customSavepoint)) {
customSavepoint = savepointService.getSavePointPath(appParam);
}
}
Map<String, Object> properties = new HashMap<>();
if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
}
Tuple3<String, String, FlinkK8sRestExposedType> clusterIdNamespace =
getNamespaceClusterId(application);
String namespace = clusterIdNamespace.t1;
String clusterId = clusterIdNamespace.t2;
CancelRequest cancelRequest =
new CancelRequest(
application.getId(),
flinkEnv.getFlinkVersion(),
FlinkDeployMode.of(application.getDeployMode()),
properties,
clusterId,
application.getJobId(),
appParam.getRestoreOrTriggerSavepoint(),
appParam.getDrain(),
customSavepoint,
appParam.getNativeFormat(),
namespace);
final Date triggerTime = new Date();
CompletableFuture<CancelResponse> cancelFuture =
CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService);
cancelFutureMap.put(application.getId(), cancelFuture);
cancelFuture.whenCompleteAsync(
(cancelResponse, throwable) -> {
cancelFutureMap.remove(application.getId());
if (throwable != null) {
String exception = ExceptionUtils.stringifyException(throwable);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
if (throwable instanceof CancellationException) {
doAbort(application.getId());
} else {
log.error("stop flink job failed.", throwable);
application.setOptionState(OptionStateEnum.NONE.getValue());
application.setState(FlinkAppStateEnum.FAILED.getValue());
updateById(application);
if (appParam.getRestoreOrTriggerSavepoint()) {
savepointService.expire(application.getId());
}
// re-tracking flink job on kubernetes and logging exception
if (application.isKubernetesModeJob()) {
TrackId id = k8sWatcherWrapper.toTrackId(application);
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkAppHttpWatcher.unWatching(application.getId());
}
}
return;
}
applicationLog.setSuccess(true);
// save log...
applicationLogService.save(applicationLog);
if (cancelResponse != null && cancelResponse.savepointDir() != null) {
String savepointDir = cancelResponse.savepointDir();
log.info("savepoint path: {}", savepointDir);
FlinkSavepoint savepoint = new FlinkSavepoint();
savepoint.setPath(savepointDir);
savepoint.setAppId(application.getId());
savepoint.setLatest(true);
savepoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
savepoint.setCreateTime(new Date());
savepoint.setTriggerTime(triggerTime);
savepointService.save(savepoint);
}
if (application.isKubernetesModeJob()) {
k8SFlinkTrackMonitor.unWatching(k8sWatcherWrapper.toTrackId(application));
}
});
}