public void run()

in inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java [50:161]


    public void run() {
        TaskCommitType commitType = TaskCommitType.getInstance(this.commitType);
        if (commitType == null) {
            commitType = TaskCommitType.START_NOW;
        }
        switch (commitType) {
            case START_NOW:
                try {
                    String jobId = flinkService.submit(flinkInfo);
                    flinkInfo.setJobId(jobId);
                    log.info("Start job {} success in backend", jobId);
                } catch (Exception e) {
                    String msg = String.format("Start job %s failed in backend exception[%s]", flinkInfo.getJobId(),
                            getExceptionStackMsg(e));
                    log.warn(msg);
                    flinkInfo.setException(true);
                    flinkInfo.setExceptionMsg(msg);
                }
                break;
            case RESUME:
                try {
                    String jobId = flinkService.restore(flinkInfo);
                    log.info("Restore job {} success in backend", jobId);
                } catch (Exception e) {
                    String msg = String.format("Restore job %s failed in backend exception[%s]", flinkInfo.getJobId(),
                            getExceptionStackMsg(e));
                    log.warn(msg);
                    flinkInfo.setException(true);
                    flinkInfo.setExceptionMsg(msg);
                }
                break;
            case RESTART:
                try {
                    StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
                    FlinkConfig flinkConfig = flinkService.getFlinkConfig();
                    stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
                    stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
                    String location = flinkService.stopJob(flinkInfo, stopWithSavepointRequest);
                    flinkInfo.setSavepointPath(location);
                    log.info("the jobId: {} savepoint: {} ", flinkInfo.getJobId(), location);
                    int times = 0;
                    while (times < TRY_MAX_TIMES) {
                        JobStatus jobStatus = flinkService.getJobStatus(flinkInfo);
                        // restore job
                        if (jobStatus == FINISHED) {
                            try {
                                String jobId = flinkService.restore(flinkInfo);
                                log.info("Restore job {} success in backend", jobId);
                            } catch (Exception e) {
                                log.error("Restore job failed in backend", e);
                            }
                            break;
                        }
                        log.info("Try start job  but the job {} is {}", flinkInfo.getJobId(), jobStatus.toString());
                        try {
                            Thread.sleep(INTERVAL * 1000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        times++;
                    }
                    log.info("Restart job {} success in backend", flinkInfo.getJobId());
                } catch (Exception e) {
                    String msg = String.format("Restart job %s failed in backend exception[%s]", flinkInfo.getJobId(),
                            getExceptionStackMsg(e));
                    log.warn(msg);
                    flinkInfo.setException(true);
                    flinkInfo.setExceptionMsg(msg);
                }
                break;
            case STOP:
                try {
                    StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
                    FlinkConfig flinkConfig = flinkService.getFlinkConfig();
                    stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
                    stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
                    String location = flinkService.stopJob(flinkInfo, stopWithSavepointRequest);
                    flinkInfo.setSavepointPath(location);
                    log.info("the jobId {} savepoint: {} ", flinkInfo.getJobId(), location);
                } catch (Exception e) {
                    String msg = String.format("stop job %s failed in backend exception[%s]", flinkInfo.getJobId(),
                            getExceptionStackMsg(e));
                    log.warn(msg);
                    flinkInfo.setException(true);
                    flinkInfo.setExceptionMsg(msg);
                }
                break;
            case DELETE:
                try {
                    flinkService.cancelJob(flinkInfo);
                    log.info("delete job {} success in backend", flinkInfo.getJobId());
                    JobStatus jobStatus = flinkService.getJobStatus(flinkInfo);
                    if (jobStatus.isTerminalState()) {
                        log.info("delete job {} success in backend", flinkInfo.getJobId());
                    } else {
                        log.info("delete job {} failed in backend", flinkInfo.getJobId());
                    }
                } catch (Exception e) {
                    String msg = String.format("delete job %s failed in backend exception[%s]", flinkInfo.getJobId(),
                            getExceptionStackMsg(e));
                    log.warn(msg);
                    flinkInfo.setException(true);
                    flinkInfo.setExceptionMsg(msg);
                }
                break;
            default:
                String msg = "not found commitType";
                flinkInfo.setException(true);
                log.warn(msg);
                flinkInfo.setExceptionMsg(msg);
        }
    }