in subprojects/frameworklauncher/yarn/src/main/java/com/microsoft/frameworklauncher/service/Service.java [844:929]
private void resyncFrameworksWithLiveApplications(Map<String, ApplicationReport> liveApplicationReports) throws Exception {
// Since Application is persistent in ZK by RM, so liveApplicationReports will never incomplete.
String logScope = "resyncFrameworksWithLiveApplications";
CHANGE_AWARE_LOGGER.initializeScope(logScope, Level.INFO, Level.DEBUG);
CHANGE_AWARE_LOGGER.log(logScope,
"Got %s live Applications from RM, start to resync them.",
liveApplicationReports.size());
for (ApplicationReport applicationReport : liveApplicationReports.values()) {
String applicationId = applicationReport.getApplicationId().toString();
YarnApplicationState applicationState = applicationReport.getYarnApplicationState();
FinalApplicationStatus applicationFinalStatus = applicationReport.getFinalApplicationStatus();
String diagnostics = CommonUtils.trim(applicationReport.getDiagnostics());
if (statusManager.isApplicationIdLiveAssociated(applicationId)) {
FrameworkStatus frameworkStatus = statusManager.getFrameworkStatusWithLiveAssociatedApplicationId(applicationId);
String frameworkName = frameworkStatus.getFrameworkName();
FrameworkState frameworkState = frameworkStatus.getFrameworkState();
if (frameworkState == FrameworkState.APPLICATION_CREATED) {
continue;
}
// updateApplicationStatus
statusManager.updateApplicationStatus(frameworkName, applicationReport);
// transitionFrameworkState
if (applicationFinalStatus == FinalApplicationStatus.UNDEFINED) {
if (applicationState == YarnApplicationState.NEW ||
applicationState == YarnApplicationState.NEW_SAVING ||
applicationState == YarnApplicationState.SUBMITTED ||
applicationState == YarnApplicationState.ACCEPTED) {
statusManager.transitionFrameworkState(frameworkName, FrameworkState.APPLICATION_WAITING);
} else if (applicationState == YarnApplicationState.RUNNING) {
statusManager.transitionFrameworkState(frameworkName, FrameworkState.APPLICATION_RUNNING);
}
} else if (applicationFinalStatus == FinalApplicationStatus.SUCCEEDED) {
retrieveApplicationExitDiagnostics(
applicationId,
FrameworkExitCode.SUCCEEDED.toInt(),
diagnostics,
false);
} else if (applicationFinalStatus == FinalApplicationStatus.KILLED) {
retrieveApplicationExitDiagnostics(
applicationId,
FrameworkExitCode.APP_KILLED_UNEXPECTEDLY.toInt(),
diagnostics,
false);
} else if (applicationFinalStatus == FinalApplicationStatus.FAILED) {
retrieveApplicationExitDiagnostics(
applicationId,
null,
diagnostics,
false);
}
} else {
// Do not kill Application due to APP_RM_RESYNC_EXCEEDED, since Exceed AM will kill itself.
// In this way, we can support multiple LauncherServices to share a single RM,
// like the sharing of HDFS and ZK.
}
}
List<String> liveAssociatedApplicationIds = statusManager.getLiveAssociatedApplicationIds();
for (String applicationId : liveAssociatedApplicationIds) {
if (!liveApplicationReports.containsKey(applicationId)) {
FrameworkStatus frameworkStatus = statusManager.getFrameworkStatusWithLiveAssociatedApplicationId(applicationId);
String frameworkName = frameworkStatus.getFrameworkName();
FrameworkState frameworkState = frameworkStatus.getFrameworkState();
// APPLICATION_CREATED Application is expected without ApplicationReport, but it is indeed live in RM.
if (frameworkState == FrameworkState.APPLICATION_CREATED) {
continue;
}
LOGGER.logWarning(
"[%s]: Cannot find live associated Application %s in resynced live Applications. " +
"Will complete it as RMResyncLost",
frameworkName, applicationId);
retrieveApplicationExitDiagnostics(
applicationId,
FrameworkExitCode.APP_RM_RESYNC_LOST.toInt(),
null,
false);
}
}
}