in subprojects/frameworklauncher/yarn/src/main/java/com/microsoft/frameworklauncher/service/StatusManager.java [462:561]
public synchronized void transitionFrameworkState(
String frameworkName,
FrameworkState dstState,
FrameworkEvent event) throws Exception {
FrameworkStatus frameworkStatus = getFrameworkStatus(frameworkName);
FrameworkState srcState = frameworkStatus.getFrameworkState();
// State transition function between each FrameworkStates
// Attempt to transition
if (srcState == dstState) {
return;
}
assert !FrameworkStateDefinition.FINAL_STATES.contains(srcState);
if (!FrameworkStateDefinition.APPLICATION_ASSOCIATED_STATES.contains(srcState) &&
FrameworkStateDefinition.APPLICATION_ASSOCIATED_STATES.contains(dstState)) {
assert (event.getApplicationContext() != null);
String applicationId = event.getApplicationContext().getApplicationId().toString();
try {
associateFrameworkWithApplication(frameworkName, event.getApplicationContext());
LOGGER.logInfo("Associated Framework [%s] with Application %s", frameworkName, applicationId);
} catch (Exception e) {
disassociateFrameworkWithApplication(frameworkName);
throw new Exception(
String.format("Failed to associate Application %s to Framework [%s]",
applicationId, frameworkName), e);
}
}
if (!FrameworkStateDefinition.APPLICATION_LIVE_ASSOCIATED_STATES.contains(srcState) &&
FrameworkStateDefinition.APPLICATION_LIVE_ASSOCIATED_STATES.contains(dstState)) {
updateExtensionFrameworkStatusWithApplicationLiveness(frameworkName, true);
}
if (FrameworkStateDefinition.APPLICATION_LIVE_ASSOCIATED_STATES.contains(srcState) &&
!FrameworkStateDefinition.APPLICATION_LIVE_ASSOCIATED_STATES.contains(dstState)) {
updateExtensionFrameworkStatusWithApplicationLiveness(frameworkName, false);
}
if (FrameworkStateDefinition.APPLICATION_ASSOCIATED_STATES.contains(srcState) &&
!FrameworkStateDefinition.APPLICATION_ASSOCIATED_STATES.contains(dstState)) {
disassociateFrameworkWithApplication(frameworkName);
}
if (dstState == FrameworkState.APPLICATION_RETRIEVING_DIAGNOSTICS) {
frameworkStatus.setApplicationExitCode(event.getApplicationExitCode());
frameworkStatus.setApplicationExitDiagnostics(event.getApplicationExitDiagnostics());
}
// No need to Cleanup ZK here, since it will be cleaned up by next Application
// No need to Cleanup HDFS here, since it will be overwrote by next Application
// No need to Cleanup RM here, since it already cleaned up before here
if (dstState == FrameworkState.APPLICATION_COMPLETED) {
assert (event.getApplicationExitCode() != null);
FrameworkExitInfo exitInfo = FrameworkExitSpec.getExitInfo(event.getApplicationExitCode());
frameworkStatus.setApplicationExitCode(event.getApplicationExitCode());
frameworkStatus.setApplicationExitDescription(exitInfo.getDescription());
frameworkStatus.setApplicationExitDiagnostics(event.getApplicationExitDiagnostics());
frameworkStatus.setApplicationExitType(exitInfo.getType());
frameworkStatus.setApplicationExitTriggerMessage(event.getApplicationExitTriggerMessage());
frameworkStatus.setApplicationExitTriggerTaskRoleName(event.getApplicationExitTriggerTaskRoleName());
frameworkStatus.setApplicationExitTriggerTaskIndex(event.getApplicationExitTriggerTaskIndex());
}
// Framework will be Retried
if (srcState == FrameworkState.APPLICATION_COMPLETED && dstState == FrameworkState.FRAMEWORK_WAITING) {
// Cleanup previous Application level external resource [ZK]
zkStore.deleteFrameworkStatus(frameworkName, true);
// Ensure transitionFrameworkState and RetryPolicyState is consistent in case of crash.
assert (event.getNewRetryPolicyState() != null);
frameworkStatus.setFrameworkRetryPolicyState(event.getNewRetryPolicyState());
}
// Record Timestamps
Long currentTimestamp = System.currentTimeMillis();
if (dstState == FrameworkState.FRAMEWORK_COMPLETED) {
frameworkStatus.setFrameworkCompletedTimestamp(currentTimestamp);
} else if (dstState == FrameworkState.APPLICATION_LAUNCHED) {
frameworkStatus.setApplicationLaunchedTimestamp(currentTimestamp);
} else if (dstState == FrameworkState.APPLICATION_COMPLETED) {
frameworkStatus.setApplicationCompletedTimestamp(currentTimestamp);
}
// Start Transition
frameworkStateLocators.get(srcState).remove(frameworkName);
frameworkStateLocators.get(dstState).add(frameworkName);
frameworkStatus.setFrameworkState(dstState);
// Update ZK Status
if (!event.getSkipToPersist()) {
zkStore.setFrameworkStatus(frameworkName, frameworkStatus);
}
LOGGER.logInfo("Transitioned Framework [%s] from [%s] to [%s] with SkipToPersist = [%s]",
frameworkName, srcState, dstState, event.getSkipToPersist());
}