public void cancel()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java [1205:1347]


  public void cancel(Application appParam) throws Exception {
    FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING);
    Application application = getById(appParam.getId());
    application.setState(FlinkAppState.CANCELLING.getValue());

    ApplicationLog applicationLog = new ApplicationLog();
    applicationLog.setOptionName(Operation.CANCEL.getValue());
    applicationLog.setAppId(application.getId());
    applicationLog.setJobManagerUrl(application.getJobManagerUrl());
    applicationLog.setOptionTime(new Date());
    applicationLog.setYarnAppId(application.getClusterId());

    if (appParam.getSavePointed()) {
      FlinkHttpWatcher.addSavepoint(application.getId());
      application.setOptionState(OptionState.SAVEPOINTING.getValue());
    } else {
      application.setOptionState(OptionState.CANCELLING.getValue());
    }

    application.setOptionTime(new Date());
    this.baseMapper.updateById(application);

    Long userId = commonService.getUserId();
    if (!application.getUserId().equals(userId)) {
      FlinkHttpWatcher.addCanceledApp(application.getId(), userId);
    }

    FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());

    // infer savepoint
    String customSavepoint = null;
    if (appParam.getSavePointed()) {
      customSavepoint = appParam.getSavePoint();
      if (StringUtils.isBlank(customSavepoint)) {
        customSavepoint = savePointService.getSavePointPath(appParam);
      }
    }

    String clusterId = null;
    if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
      clusterId = application.getClusterId();
    } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
      if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
        FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
        ApiAlertException.throwIfNull(
            cluster,
            String.format(
                "The yarn session clusterId=%s can't found, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
                application.getFlinkClusterId()));
        clusterId = cluster.getClusterId();
      } else {
        clusterId = application.getAppId();
      }
    }

    Map<String, Object> properties = new HashMap<>();

    if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
      FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
      ApiAlertException.throwIfNull(
          cluster,
          String.format(
              "The clusterId=%s cannot be find, maybe the clusterId is wrong or "
                  + "the cluster has been deleted. Please contact the Admin.",
              application.getFlinkClusterId()));
      URI activeAddress = cluster.getRemoteURI();
      properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
      properties.put(RestOptions.PORT.key(), activeAddress.getPort());
    }

    CancelRequest cancelRequest =
        new CancelRequest(
            flinkEnv.getFlinkVersion(),
            ExecutionMode.of(application.getExecutionMode()),
            properties,
            clusterId,
            application.getJobId(),
            appParam.getSavePointed(),
            appParam.getDrain(),
            customSavepoint,
            application.getK8sNamespace());

    final Date triggerTime = new Date();
    CompletableFuture<CancelResponse> cancelFuture =
        CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService);

    cancelFutureMap.put(application.getId(), cancelFuture);

    CompletableFutureUtils.runTimeout(
            cancelFuture,
            10L,
            TimeUnit.MINUTES,
            cancelResponse -> {
              applicationLog.setSuccess(true);
              if (cancelResponse != null && cancelResponse.savePointDir() != null) {
                String savePointDir = cancelResponse.savePointDir();
                log.info("savePoint path: {}", savePointDir);
                SavePoint savePoint = new SavePoint();
                savePoint.setPath(savePointDir);
                savePoint.setAppId(application.getId());
                savePoint.setLatest(true);
                savePoint.setType(CheckPointType.SAVEPOINT.get());
                savePoint.setCreateTime(new Date());
                savePoint.setTriggerTime(triggerTime);
                savePointService.save(savePoint);
              }
              if (isKubernetesApp(application)) {
                k8SFlinkTrackMonitor.unWatching(toTrackId(application));
              }
            },
            e -> {
              if (e.getCause() instanceof CancellationException) {
                updateToStopped(application);
              } else {
                log.error("stop flink job fail.", e);
                application.setOptionState(OptionState.NONE.getValue());
                application.setState(FlinkAppState.FAILED.getValue());
                updateById(application);

                if (appParam.getSavePointed()) {
                  savePointService.expire(application.getId());
                }

                // re-tracking flink job on kubernetes and logging exception
                if (isKubernetesApp(application)) {
                  TrackId id = toTrackId(application);
                  k8SFlinkTrackMonitor.unWatching(id);
                  k8SFlinkTrackMonitor.doWatching(id);
                } else {
                  FlinkHttpWatcher.unWatching(application.getId());
                }

                String exception = Utils.stringifyException(e);
                applicationLog.setException(exception);
                applicationLog.setSuccess(false);
              }
            })
        .whenComplete(
            (t, e) -> {
              cancelFutureMap.remove(application.getId());
              applicationLogService.save(applicationLog);
            });
  }