in subprojects/frameworklauncher/yarn/src/main/java/com/microsoft/frameworklauncher/applicationmaster/StatusManager.java [810:929]
public synchronized void transitionTaskState(
TaskStatusLocator locator,
TaskState dstState,
TaskEvent event) throws Exception {
TaskStatus taskStatus = getTaskStatus(locator);
TaskState srcState = taskStatus.getTaskState();
// State transition function between each TaskState
// Attempt to transition
if (srcState == dstState) {
return;
}
assert (!TaskStateDefinition.FINAL_STATES.contains(srcState));
if (srcState == TaskState.CONTAINER_REQUESTED) {
removeContainerRequest(locator);
}
if (dstState == TaskState.CONTAINER_REQUESTED) {
assert (event.getContainerRequest() != null);
addContainerRequest(locator, event.getContainerRequest());
}
if (!TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(srcState) &&
TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(dstState)) {
assert (event.getContainer() != null);
String containerId = event.getContainer().getId().toString();
try {
associateTaskWithContainer(locator, event.getContainer(), event.getPortDefinitions());
LOGGER.logInfo("Associated Task %s with Container %s", locator, containerId);
} catch (Exception e) {
disassociateTaskWithContainer(locator);
throw new Exception(
String.format("Failed to associate Container %s to Task %s",
containerId, locator), e);
}
}
if (!TaskStateDefinition.CONTAINER_LIVE_ASSOCIATED_STATES.contains(srcState) &&
TaskStateDefinition.CONTAINER_LIVE_ASSOCIATED_STATES.contains(dstState)) {
updateExtensionTaskStatusWithContainerLiveness(locator, true);
}
if (TaskStateDefinition.CONTAINER_LIVE_ASSOCIATED_STATES.contains(srcState) &&
!TaskStateDefinition.CONTAINER_LIVE_ASSOCIATED_STATES.contains(dstState)) {
updateExtensionTaskStatusWithContainerLiveness(locator, false);
}
if (TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(srcState) &&
!TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(dstState)) {
disassociateTaskWithContainer(locator);
}
if (srcState == TaskState.CONTAINER_ALLOCATED) {
removeAllocatedContainer(locator);
}
if (dstState == TaskState.CONTAINER_ALLOCATED) {
assert (event.getContainer() != null);
addAllocatedContainer(locator, event.getContainer());
}
if (dstState == TaskState.CONTAINER_COMPLETED) {
assert (event.getContainerRawExitCode() != null);
Integer exitCode = FrameworkExitSpec.lookupExitCode(
event.getContainerRawExitCode(), event.getContainerRawExitDiagnostics());
FrameworkExitInfo exitInfo = FrameworkExitSpec.getExitInfo(exitCode);
taskStatus.setContainerExitCode(exitCode);
taskStatus.setContainerExitDescription(exitInfo.getDescription());
taskStatus.setContainerExitDiagnostics(event.getContainerRawExitDiagnostics());
taskStatus.setContainerExitType(exitInfo.getType());
}
// Task will be Retried
if (srcState == TaskState.CONTAINER_COMPLETED && dstState == TaskState.TASK_WAITING) {
// Ensure transitionTaskState and RetryPolicyState is Transactional
assert (event.getNewRetryPolicyState() != null);
taskStatus.setTaskRetryPolicyState(event.getNewRetryPolicyState());
}
// Record Timestamps
Long currentTimestamp = System.currentTimeMillis();
if (dstState == TaskState.TASK_COMPLETED) {
taskStatus.setTaskCompletedTimestamp(currentTimestamp);
} else if (dstState == TaskState.CONTAINER_LAUNCHED) {
taskStatus.setContainerLaunchedTimestamp(currentTimestamp);
} else if (dstState == TaskState.CONTAINER_COMPLETED) {
taskStatus.setContainerCompletedTimestamp(currentTimestamp);
}
// Start Transition
taskStateLocators.get(srcState).remove(locator);
taskStateLocators.get(dstState).add(locator);
taskStatus.setTaskState(dstState);
// Mark as changed
taskStatusesesChanged.put(locator.getTaskRoleName(), true);
LOGGER.logInfo("Transitioned Task %s from [%s] to [%s]", locator, srcState, dstState);
// Start Transition Callbacks
if (TaskStateDefinition.OUTSTANDING_STATES.contains(srcState) &&
!TaskStateDefinition.OUTSTANDING_STATES.contains(dstState)) {
int outstandingTaskCount = getOutstandingStateTaskCount();
if (outstandingTaskCount == 0) {
onOutstandingTaskDisappeared();
}
}
if (!TaskStateDefinition.OUTSTANDING_STATES.contains(srcState) &&
TaskStateDefinition.OUTSTANDING_STATES.contains(dstState)) {
int outstandingTaskCount = getOutstandingStateTaskCount();
if (outstandingTaskCount == 1) {
onOutstandingTaskAppeared(outstandingTaskCount);
}
}
}