in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java [1205:1347]
public void cancel(Application appParam) throws Exception {
FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING);
Application application = getById(appParam.getId());
application.setState(FlinkAppState.CANCELLING.getValue());
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(Operation.CANCEL.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setJobManagerUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
applicationLog.setYarnAppId(application.getClusterId());
if (appParam.getSavePointed()) {
FlinkHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionState.CANCELLING.getValue());
}
application.setOptionTime(new Date());
this.baseMapper.updateById(application);
Long userId = commonService.getUserId();
if (!application.getUserId().equals(userId)) {
FlinkHttpWatcher.addCanceledApp(application.getId(), userId);
}
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
// infer savepoint
String customSavepoint = null;
if (appParam.getSavePointed()) {
customSavepoint = appParam.getSavePoint();
if (StringUtils.isBlank(customSavepoint)) {
customSavepoint = savePointService.getSavePointPath(appParam);
}
}
String clusterId = null;
if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
clusterId = application.getClusterId();
} else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
String.format(
"The yarn session clusterId=%s can't found, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
clusterId = cluster.getClusterId();
} else {
clusterId = application.getAppId();
}
}
Map<String, Object> properties = new HashMap<>();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
}
CancelRequest cancelRequest =
new CancelRequest(
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
properties,
clusterId,
application.getJobId(),
appParam.getSavePointed(),
appParam.getDrain(),
customSavepoint,
application.getK8sNamespace());
final Date triggerTime = new Date();
CompletableFuture<CancelResponse> cancelFuture =
CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService);
cancelFutureMap.put(application.getId(), cancelFuture);
CompletableFutureUtils.runTimeout(
cancelFuture,
10L,
TimeUnit.MINUTES,
cancelResponse -> {
applicationLog.setSuccess(true);
if (cancelResponse != null && cancelResponse.savePointDir() != null) {
String savePointDir = cancelResponse.savePointDir();
log.info("savePoint path: {}", savePointDir);
SavePoint savePoint = new SavePoint();
savePoint.setPath(savePointDir);
savePoint.setAppId(application.getId());
savePoint.setLatest(true);
savePoint.setType(CheckPointType.SAVEPOINT.get());
savePoint.setCreateTime(new Date());
savePoint.setTriggerTime(triggerTime);
savePointService.save(savePoint);
}
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
}
},
e -> {
if (e.getCause() instanceof CancellationException) {
updateToStopped(application);
} else {
log.error("stop flink job fail.", e);
application.setOptionState(OptionState.NONE.getValue());
application.setState(FlinkAppState.FAILED.getValue());
updateById(application);
if (appParam.getSavePointed()) {
savePointService.expire(application.getId());
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkHttpWatcher.unWatching(application.getId());
}
String exception = Utils.stringifyException(e);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
}
})
.whenComplete(
(t, e) -> {
cancelFutureMap.remove(application.getId());
applicationLogService.save(applicationLog);
});
}