private void getStateFromYarn()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java [542:628]


    private void getStateFromYarn(FlinkApplication application) throws Exception {
        log.debug("[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi starting...");
        StopFromEnum stopFrom = getStopFrom(application);
        OptionStateEnum optionState = OPTIONING.get(application.getId());

        /*
         * If the status of the last time is CANCELING (flink rest server is not closed at the time of getting
         * information) and the status is not obtained this time (flink rest server is closed), the task is considered
         * CANCELED
         */
        Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
        if (flag != null) {
            log.info("[StreamPark][FlinkAppHttpWatcher] previous state: canceling.");
            FlinkAppStateEnum flinkAppState = FlinkAppStateEnum.CANCELED;
            try {
                YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
                if (yarnAppInfo != null) {
                    String state = yarnAppInfo.getApp().getFinalStatus();
                    flinkAppState = FlinkAppStateEnum.getState(state);
                }
            } finally {
                if (StopFromEnum.NONE.equals(stopFrom)) {
                    log.error(
                        "[StreamPark][FlinkAppHttpWatcher] query previous state was canceling and stopFrom NotFound,savepoint expired!");
                    savepointService.expire(application.getId());
                    if (flinkAppState == FlinkAppStateEnum.KILLED
                        || flinkAppState == FlinkAppStateEnum.FAILED) {
                        doAlert(application, flinkAppState);
                    }
                }
                application.setState(flinkAppState.getValue());
                cleanSavepoint(application);
                cleanOptioning(optionState, application.getId());
                doPersistMetrics(application, true);
            }

        } else {
            // query the status from the yarn rest Api
            YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
            if (yarnAppInfo == null) {
                if (!FlinkDeployMode.REMOTE.equals(application.getDeployModeEnum())) {
                    throw new RuntimeException(
                        "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi failed ");
                }
            } else {
                try {
                    String state = yarnAppInfo.getApp().getFinalStatus();
                    FlinkAppStateEnum flinkAppState = FlinkAppStateEnum.getState(state);
                    if (FlinkAppStateEnum.OTHER.equals(flinkAppState)) {
                        return;
                    }
                    if (FlinkAppStateEnum.KILLED.equals(flinkAppState)) {
                        if (StopFromEnum.NONE.equals(stopFrom)) {
                            log.error(
                                "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job was killed and stopFrom NotFound,savepoint expired!");
                            savepointService.expire(application.getId());
                        }
                        flinkAppState = FlinkAppStateEnum.CANCELED;
                        cleanSavepoint(application);
                        application.setEndTime(new Date());
                    }
                    if (FlinkAppStateEnum.SUCCEEDED.equals(flinkAppState)) {
                        flinkAppState = FlinkAppStateEnum.FINISHED;
                    }
                    application.setState(flinkAppState.getValue());
                    cleanOptioning(optionState, application.getId());
                    doPersistMetrics(application, true);
                    if (flinkAppState.equals(FlinkAppStateEnum.FAILED)
                        || flinkAppState.equals(FlinkAppStateEnum.LOST)
                        || (flinkAppState.equals(FlinkAppStateEnum.CANCELED)
                            && StopFromEnum.NONE.equals(stopFrom))
                        || applicationInfoService.checkAlter(application)) {
                        doAlert(application, flinkAppState);
                        stopCanceledJob(application.getId());
                        if (flinkAppState.equals(FlinkAppStateEnum.FAILED)) {
                            applicationActionService.start(application, true);
                        }
                    }
                } catch (Exception e) {
                    if (!FlinkDeployMode.REMOTE.equals(application.getDeployModeEnum())) {
                        throw new RuntimeException(
                            "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi error,", e);
                    }
                }
            }
        }
    }