private Set resyncTasksWithLiveContainers()

in subprojects/frameworklauncher/yarn/src/main/java/com/microsoft/frameworklauncher/applicationmaster/ApplicationMaster.java [877:977]


  private Set<String> resyncTasksWithLiveContainers(Set<String> liveContainerIds) throws Exception {
    String logScope = "resyncTasksWithLiveContainers";
    CHANGE_AWARE_LOGGER.initializeScope(logScope, Level.INFO, Level.DEBUG);

    Set<String> retainContainerIds = new HashSet<>();
    if (liveContainerIds == null) {
      CHANGE_AWARE_LOGGER.log(logScope,
          "Got null live Containers from RM, so RMResync is incomplete. " +
              "resetContainerConnectionLostCount for all tasks, since around this time RMResync must also be incomplete.");
      statusManager.resetContainerConnectionLostCount();
    } else {
      CHANGE_AWARE_LOGGER.log(logScope,
          "Got %s live Containers from RM, start to resync them.",
          liveContainerIds.size());

      for (String containerId : liveContainerIds) {
        if (statusManager.isContainerIdLiveAssociated(containerId)) {
          statusManager.resetContainerConnectionLostCount(containerId);
          retainContainerIds.add(containerId);
        } else {
          if (!containerConnectionExceedCount.containsKey(containerId)) {
            containerConnectionExceedCount.put(containerId, 0);
          }
          containerConnectionExceedCount.put(containerId, containerConnectionExceedCount.get(containerId) + 1);
          Integer exceedCount = containerConnectionExceedCount.get(containerId);

          LOGGER.logWarning(
              "Cannot find resynced live Container %s in live associated Containers. " +
                  "IncreaseContainerConnectionExceedCount to %s.",
              containerId, exceedCount);

          Integer maxExceedCount = requestManager.getPlatParams().getContainerConnectionMaxExceedCount();
          if (exceedCount > maxExceedCount) {
            LOGGER.logWarning(
                "Live Container %s's ContainerConnectionExceedCount %s " +
                    "exceed ContainerConnectionMaxExceedCount %s. " +
                    "Will complete it as RMResyncExceed",
                containerId, exceedCount, maxExceedCount);

            // This may Release the Container which is Allocated in RM, but AM has not got notified
            // through the onContainersAllocated.
            // To avoid this, we need to ensure the Release happens after onContainersAllocated, i.e.
            // AMRMHeartbeatIntervalSec < ContainerConnectionMaxExceedCount * AMRMResyncIntervalSec
            completeContainer(
                containerId,
                FrameworkExitCode.CONTAINER_RM_RESYNC_EXCEEDED.toInt(),
                null,
                true);

            // Pending Exceed Container now is settled to definitely Exceed Container
            containerConnectionExceedCount.remove(containerId);
          } else {
            retainContainerIds.add(containerId);
          }
        }
      }

      List<String> liveAssociatedContainerIds = statusManager.getLiveAssociatedContainerIds();
      for (String containerId : liveAssociatedContainerIds) {
        if (!liveContainerIds.contains(containerId)) {
          TaskStatus taskStatus = statusManager.getTaskStatusWithLiveAssociatedContainerId(containerId);
          String taskRoleName = taskStatus.getTaskRoleName();
          TaskStatusLocator taskLocator = new TaskStatusLocator(taskRoleName, taskStatus.getTaskIndex());

          statusManager.increaseContainerConnectionLostCount(containerId);
          Integer lostCount = taskStatus.getContainerConnectionLostCount();
          LOGGER.logWarning(
              "%s: Cannot find live associated Container %s in resynced live Containers. " +
                  "increaseContainerConnectionLostCount to %s.",
              taskLocator, containerId, lostCount);

          Integer maxLostCount = requestManager.getPlatParams().getContainerConnectionMaxLostCount();
          if (maxLostCount == GlobalConstants.USING_DEFAULT_VALUE) {
            maxLostCount = conf.getLauncherConfig().getAmRmResyncFrequency();
          }

          // This may Mis-Complete the Container, when liveContainerIds is incomplete.
          // If miss judging rate is still too high, we need to combine the ContainerStatus from NM to
          // double confirm the Container is lost/complete.
          if (maxLostCount != GlobalConstants.USING_UNLIMITED_VALUE &&
              lostCount > maxLostCount) {
            LOGGER.logWarning(
                "%s: Live associated Container %s's ContainerConnectionLostCount %s " +
                    "exceed ContainerConnectionMaxLostCount %s. " +
                    "Will complete it as RMResyncLost",
                taskLocator, containerId, lostCount, maxLostCount);

            completeContainer(
                containerId,
                FrameworkExitCode.CONTAINER_RM_RESYNC_LOST.toInt(),
                null,
                true);
          } else {
            retainContainerIds.add(containerId);
          }
        }
      }
    }

    return retainContainerIds;
  }