in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java [542:628]
private void getStateFromYarn(FlinkApplication application) throws Exception {
log.debug("[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi starting...");
StopFromEnum stopFrom = getStopFrom(application);
OptionStateEnum optionState = OPTIONING.get(application.getId());
/*
* If the status of the last time is CANCELING (flink rest server is not closed at the time of getting
* information) and the status is not obtained this time (flink rest server is closed), the task is considered
* CANCELED
*/
Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
if (flag != null) {
log.info("[StreamPark][FlinkAppHttpWatcher] previous state: canceling.");
FlinkAppStateEnum flinkAppState = FlinkAppStateEnum.CANCELED;
try {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo != null) {
String state = yarnAppInfo.getApp().getFinalStatus();
flinkAppState = FlinkAppStateEnum.getState(state);
}
} finally {
if (StopFromEnum.NONE.equals(stopFrom)) {
log.error(
"[StreamPark][FlinkAppHttpWatcher] query previous state was canceling and stopFrom NotFound,savepoint expired!");
savepointService.expire(application.getId());
if (flinkAppState == FlinkAppStateEnum.KILLED
|| flinkAppState == FlinkAppStateEnum.FAILED) {
doAlert(application, flinkAppState);
}
}
application.setState(flinkAppState.getValue());
cleanSavepoint(application);
cleanOptioning(optionState, application.getId());
doPersistMetrics(application, true);
}
} else {
// query the status from the yarn rest Api
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo == null) {
if (!FlinkDeployMode.REMOTE.equals(application.getDeployModeEnum())) {
throw new RuntimeException(
"[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi failed ");
}
} else {
try {
String state = yarnAppInfo.getApp().getFinalStatus();
FlinkAppStateEnum flinkAppState = FlinkAppStateEnum.getState(state);
if (FlinkAppStateEnum.OTHER.equals(flinkAppState)) {
return;
}
if (FlinkAppStateEnum.KILLED.equals(flinkAppState)) {
if (StopFromEnum.NONE.equals(stopFrom)) {
log.error(
"[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job was killed and stopFrom NotFound,savepoint expired!");
savepointService.expire(application.getId());
}
flinkAppState = FlinkAppStateEnum.CANCELED;
cleanSavepoint(application);
application.setEndTime(new Date());
}
if (FlinkAppStateEnum.SUCCEEDED.equals(flinkAppState)) {
flinkAppState = FlinkAppStateEnum.FINISHED;
}
application.setState(flinkAppState.getValue());
cleanOptioning(optionState, application.getId());
doPersistMetrics(application, true);
if (flinkAppState.equals(FlinkAppStateEnum.FAILED)
|| flinkAppState.equals(FlinkAppStateEnum.LOST)
|| (flinkAppState.equals(FlinkAppStateEnum.CANCELED)
&& StopFromEnum.NONE.equals(stopFrom))
|| applicationInfoService.checkAlter(application)) {
doAlert(application, flinkAppState);
stopCanceledJob(application.getId());
if (flinkAppState.equals(FlinkAppStateEnum.FAILED)) {
applicationActionService.start(application, true);
}
}
} catch (Exception e) {
if (!FlinkDeployMode.REMOTE.equals(application.getDeployModeEnum())) {
throw new RuntimeException(
"[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi error,", e);
}
}
}
}
}