samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [465:491]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    List<SamzaResourceStatus> samzaResourceStatuses = new ArrayList<>();

    for (ContainerStatus status : statuses) {
      log.info("Got completion notification for Container ID: {} with status: {} and state: {}. Diagnostics information: {}.",
          status.getContainerId(), status.getExitStatus(), status.getState(), status.getDiagnostics());

      SamzaResourceStatus samzaResourceStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
      samzaResourceStatuses.add(samzaResourceStatus);

      String completedProcessorID = getRunningProcessorId(status.getContainerId().toString());
      log.info("Completed Container ID: {} had Processor ID: {}", status.getContainerId(), completedProcessorID);

      //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
      //failed containers.
      if (!completedProcessorID.equals(INVALID_PROCESSOR_ID)) {
        if (state.runningProcessors.containsKey(completedProcessorID)) {
          log.info("Removing Processor ID: {} from YarnClusterResourceManager running processors.", completedProcessorID);
          state.runningProcessors.remove(completedProcessorID);

          if (status.getExitStatus() != ContainerExitStatus.SUCCESS)
            state.failedContainersStatus.put(status.getContainerId().toString(), status);
        }
      }
    }
    clusterManagerCallback.onResourcesCompleted(samzaResourceStatuses);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [464:490]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    List<SamzaResourceStatus> samzaResourceStatuses = new ArrayList<>();

    for (ContainerStatus status : statuses) {
      log.info("Got completion notification for Container ID: {} with status: {} and state: {}. Diagnostics information: {}.",
          status.getContainerId(), status.getExitStatus(), status.getState(), status.getDiagnostics());

      SamzaResourceStatus samzaResourceStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
      samzaResourceStatuses.add(samzaResourceStatus);

      String completedProcessorID = getRunningProcessorId(status.getContainerId().toString());
      log.info("Completed Container ID: {} had Processor ID: {}", status.getContainerId(), completedProcessorID);

      //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
      //failed containers.
      if (!completedProcessorID.equals(INVALID_PROCESSOR_ID)) {
        if (state.runningProcessors.containsKey(completedProcessorID)) {
          log.info("Removing Processor ID: {} from YarnClusterResourceManager running processors.", completedProcessorID);
          state.runningProcessors.remove(completedProcessorID);

          if (status.getExitStatus() != ContainerExitStatus.SUCCESS)
            state.failedContainersStatus.put(status.getContainerId().toString(), status);
        }
      }
    }
    clusterManagerCallback.onResourcesCompleted(samzaResourceStatuses);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



