public void cancel()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java [267:406]


    public void cancel(FlinkApplication appParam) throws Exception {
        // For HA purposes, if the task is not processed locally, save the Distribution task and return
        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
            distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.CANCEL);
            return;
        }
        FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.CANCELLING);
        FlinkApplication application = getById(appParam.getId());
        application.setState(FlinkAppStateEnum.CANCELLING.getValue());

        ApplicationLog applicationLog = new ApplicationLog();
        applicationLog.setJobType(EngineTypeEnum.FLINK.getCode());
        applicationLog.setOptionName(OperationEnum.CANCEL.getValue());
        applicationLog.setAppId(application.getId());
        applicationLog.setTrackingUrl(application.getJobManagerUrl());
        applicationLog.setCreateTime(new Date());
        applicationLog.setClusterId(application.getClusterId());
        applicationLog.setUserId(ServiceHelper.getUserId());

        if (appParam.getRestoreOrTriggerSavepoint()) {
            FlinkAppHttpWatcher.addSavepoint(application.getId());
            application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
        } else {
            application.setOptionState(OptionStateEnum.CANCELLING.getValue());
        }

        application.setOptionTime(new Date());
        this.baseMapper.updateById(application);

        Long userId = ServiceHelper.getUserId();
        if (!application.getUserId().equals(userId)) {
            FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
        }

        FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());

        // infer savepoint
        String customSavepoint = null;
        if (appParam.getRestoreOrTriggerSavepoint()) {
            customSavepoint = appParam.getSavepointPath();
            if (StringUtils.isBlank(customSavepoint)) {
                customSavepoint = savepointService.getSavePointPath(appParam);
            }
        }

        Map<String, Object> properties = new HashMap<>();

        if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) {
            FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
            ApiAlertException.throwIfNull(
                cluster,
                String.format(
                    "The clusterId=%s cannot be find, maybe the clusterId is wrong or "
                        + "the cluster has been deleted. Please contact the Admin.",
                    application.getFlinkClusterId()));
            URI activeAddress = cluster.getRemoteURI();
            properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
            properties.put(RestOptions.PORT.key(), activeAddress.getPort());
        }

        Tuple3<String, String, FlinkK8sRestExposedType> clusterIdNamespace =
            getNamespaceClusterId(application);
        String namespace = clusterIdNamespace.t1;
        String clusterId = clusterIdNamespace.t2;

        CancelRequest cancelRequest =
            new CancelRequest(
                application.getId(),
                flinkEnv.getFlinkVersion(),
                FlinkDeployMode.of(application.getDeployMode()),
                properties,
                clusterId,
                application.getJobId(),
                appParam.getRestoreOrTriggerSavepoint(),
                appParam.getDrain(),
                customSavepoint,
                appParam.getNativeFormat(),
                namespace);

        final Date triggerTime = new Date();
        CompletableFuture<CancelResponse> cancelFuture =
            CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService);

        cancelFutureMap.put(application.getId(), cancelFuture);

        cancelFuture.whenCompleteAsync(
            (cancelResponse, throwable) -> {
                cancelFutureMap.remove(application.getId());

                if (throwable != null) {
                    String exception = ExceptionUtils.stringifyException(throwable);
                    applicationLog.setException(exception);
                    applicationLog.setSuccess(false);
                    applicationLogService.save(applicationLog);

                    if (throwable instanceof CancellationException) {
                        doAbort(application.getId());
                    } else {
                        log.error("stop flink job failed.", throwable);
                        application.setOptionState(OptionStateEnum.NONE.getValue());
                        application.setState(FlinkAppStateEnum.FAILED.getValue());
                        updateById(application);

                        if (appParam.getRestoreOrTriggerSavepoint()) {
                            savepointService.expire(application.getId());
                        }
                        // re-tracking flink job on kubernetes and logging exception
                        if (application.isKubernetesModeJob()) {
                            TrackId id = k8sWatcherWrapper.toTrackId(application);
                            k8SFlinkTrackMonitor.unWatching(id);
                            k8SFlinkTrackMonitor.doWatching(id);
                        } else {
                            FlinkAppHttpWatcher.unWatching(application.getId());
                        }
                    }
                    return;
                }

                applicationLog.setSuccess(true);
                // save log...
                applicationLogService.save(applicationLog);

                if (cancelResponse != null && cancelResponse.savepointDir() != null) {
                    String savepointDir = cancelResponse.savepointDir();
                    log.info("savepoint path: {}", savepointDir);
                    FlinkSavepoint savepoint = new FlinkSavepoint();
                    savepoint.setPath(savepointDir);
                    savepoint.setAppId(application.getId());
                    savepoint.setLatest(true);
                    savepoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
                    savepoint.setCreateTime(new Date());
                    savepoint.setTriggerTime(triggerTime);
                    savepointService.save(savepoint);
                }

                if (application.isKubernetesModeJob()) {
                    k8SFlinkTrackMonitor.unWatching(k8sWatcherWrapper.toTrackId(application));
                }
            });
    }