in subprojects/frameworklauncher/yarn/src/main/java/com/microsoft/frameworklauncher/applicationmaster/ApplicationMaster.java [877:977]
private Set<String> resyncTasksWithLiveContainers(Set<String> liveContainerIds) throws Exception {
String logScope = "resyncTasksWithLiveContainers";
CHANGE_AWARE_LOGGER.initializeScope(logScope, Level.INFO, Level.DEBUG);
Set<String> retainContainerIds = new HashSet<>();
if (liveContainerIds == null) {
CHANGE_AWARE_LOGGER.log(logScope,
"Got null live Containers from RM, so RMResync is incomplete. " +
"resetContainerConnectionLostCount for all tasks, since around this time RMResync must also be incomplete.");
statusManager.resetContainerConnectionLostCount();
} else {
CHANGE_AWARE_LOGGER.log(logScope,
"Got %s live Containers from RM, start to resync them.",
liveContainerIds.size());
for (String containerId : liveContainerIds) {
if (statusManager.isContainerIdLiveAssociated(containerId)) {
statusManager.resetContainerConnectionLostCount(containerId);
retainContainerIds.add(containerId);
} else {
if (!containerConnectionExceedCount.containsKey(containerId)) {
containerConnectionExceedCount.put(containerId, 0);
}
containerConnectionExceedCount.put(containerId, containerConnectionExceedCount.get(containerId) + 1);
Integer exceedCount = containerConnectionExceedCount.get(containerId);
LOGGER.logWarning(
"Cannot find resynced live Container %s in live associated Containers. " +
"IncreaseContainerConnectionExceedCount to %s.",
containerId, exceedCount);
Integer maxExceedCount = requestManager.getPlatParams().getContainerConnectionMaxExceedCount();
if (exceedCount > maxExceedCount) {
LOGGER.logWarning(
"Live Container %s's ContainerConnectionExceedCount %s " +
"exceed ContainerConnectionMaxExceedCount %s. " +
"Will complete it as RMResyncExceed",
containerId, exceedCount, maxExceedCount);
// This may Release the Container which is Allocated in RM, but AM has not got notified
// through the onContainersAllocated.
// To avoid this, we need to ensure the Release happens after onContainersAllocated, i.e.
// AMRMHeartbeatIntervalSec < ContainerConnectionMaxExceedCount * AMRMResyncIntervalSec
completeContainer(
containerId,
FrameworkExitCode.CONTAINER_RM_RESYNC_EXCEEDED.toInt(),
null,
true);
// Pending Exceed Container now is settled to definitely Exceed Container
containerConnectionExceedCount.remove(containerId);
} else {
retainContainerIds.add(containerId);
}
}
}
List<String> liveAssociatedContainerIds = statusManager.getLiveAssociatedContainerIds();
for (String containerId : liveAssociatedContainerIds) {
if (!liveContainerIds.contains(containerId)) {
TaskStatus taskStatus = statusManager.getTaskStatusWithLiveAssociatedContainerId(containerId);
String taskRoleName = taskStatus.getTaskRoleName();
TaskStatusLocator taskLocator = new TaskStatusLocator(taskRoleName, taskStatus.getTaskIndex());
statusManager.increaseContainerConnectionLostCount(containerId);
Integer lostCount = taskStatus.getContainerConnectionLostCount();
LOGGER.logWarning(
"%s: Cannot find live associated Container %s in resynced live Containers. " +
"increaseContainerConnectionLostCount to %s.",
taskLocator, containerId, lostCount);
Integer maxLostCount = requestManager.getPlatParams().getContainerConnectionMaxLostCount();
if (maxLostCount == GlobalConstants.USING_DEFAULT_VALUE) {
maxLostCount = conf.getLauncherConfig().getAmRmResyncFrequency();
}
// This may Mis-Complete the Container, when liveContainerIds is incomplete.
// If miss judging rate is still too high, we need to combine the ContainerStatus from NM to
// double confirm the Container is lost/complete.
if (maxLostCount != GlobalConstants.USING_UNLIMITED_VALUE &&
lostCount > maxLostCount) {
LOGGER.logWarning(
"%s: Live associated Container %s's ContainerConnectionLostCount %s " +
"exceed ContainerConnectionMaxLostCount %s. " +
"Will complete it as RMResyncLost",
taskLocator, containerId, lostCount, maxLostCount);
completeContainer(
containerId,
FrameworkExitCode.CONTAINER_RM_RESYNC_LOST.toInt(),
null,
true);
} else {
retainContainerIds.add(containerId);
}
}
}
}
return retainContainerIds;
}