samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [313:511]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
    String processorId = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID);
    String containerId = resource.getContainerId();
    String host = resource.getHost();
    log.info("Starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host);
    synchronized (lock) {
      try {
        Container container = allocatedResources.get(resource);
        if (container == null) {
          log.info("Container ID: {} on host: {} was already allocated / released.", containerId, host);
          return;
        }

        runProcessor(processorId, container, builder);
      } catch (Throwable t) {
        log.info("Error starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host, t);
        clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
      }
    }
  }

  public void stopStreamProcessor(SamzaResource resource) {
    synchronized (lock) {
      Container container = allocatedResources.get(resource);
      String containerId = resource.getContainerId();
      String containerHost = resource.getHost();
      /*
       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
       *    attempt and managed by the AM due to AM-HA
       * 3. Ignore the request if the container associated with the resource isn't present in the book keeping.
       */
      if (container != null) {
        log.info("Stopping Container ID: {} on host: {}", containerId, containerHost);
        this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      } else {
        YarnContainer yarnContainer = state.runningProcessors.get(getRunningProcessorId(containerId));
        if (yarnContainer != null) {
          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
              containerId, containerHost);
          amClient.releaseAssignedContainer(yarnContainer.id());
        } else {
          log.info("No container with Container ID: {} exists. Ignoring the stop request", containerId);
        }
      }
    }
  }

  /**
   * Given a containerId from Yarn (for example: containerId_app_12345, this method returns the processor ID
   * in the range [0,N-1] that maps to it.
   * @param containerId  the Yarn container ID.
   * @return  the Samza processor ID.
   */
  //TODO: Get rid of the YarnContainer object and just use Container in state.runningProcessors hashmap.
  //In that case, this scan will turn into a lookup. This change will require changes/testing in the UI files because
  //those UI stub templates operate on the YarnContainer object.
  private String getRunningProcessorId(String containerId) {
    for (Map.Entry<String, YarnContainer> entry : state.runningProcessors.entrySet()) {
      String key = entry.getKey();
      YarnContainer yarnContainer = entry.getValue();
      String yarnContainerId = yarnContainer.id().toString();
      if (yarnContainerId.equals(containerId)) {
        return key;
      }
    }
    return INVALID_PROCESSOR_ID;
  }


  /**
   *
   * Remove a previously submitted resource request. The previous container request may have
   * been submitted. Even after the remove request, a Callback implementation must
   * be prepared to receive an allocation for the previous request. This is merely a best effort cancellation.
   *
   * @param request the request to be cancelled
   */
  @Override
  public void cancelResourceRequest(SamzaResourceRequest request) {
    String processorId = request.getProcessorId();
    String preferredHost = request.getPreferredHost();
    String requestId = request.getRequestId();
    log.info("Cancelling resource request for Processor ID: {} on host: {} with Request ID: {}",
        processorId, preferredHost, requestId);
    //ensure that removal and cancellation are done atomically.
    synchronized (lock) {
      AMRMClient.ContainerRequest containerRequest = requestsMap.get(request);
      if (containerRequest == null) {
        log.info("Resource request for Processor ID: {} on host: {} with Request ID: {} already cancelled.",
            processorId, preferredHost, requestId);
        return;
      }
      requestsMap.remove(request);
      amClient.removeContainerRequest(containerRequest);
    }
  }


  /**
   * Stops the YarnContainerManager and all its sub-components.
   * Stop should NOT be called from multiple threads.
   * TODO: fix this to make stop idempotent?.
   */
  @Override
  public void stop(SamzaApplicationState.SamzaAppStatus status) {
    log.info("Stopping the AM client on shutdown request.");
    lifecycle.onShutdown(status);
    amClient.stop();
    log.info("Stopping the NM client on shutdown request.");
    nmClientAsync.stop();
    log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
    service.onShutdown();
    log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
    metrics.stop();

    if (status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
      cleanupStagingDir();
    }
  }

  /**
   * Cleans up the staging directory of the job. All exceptions during the cleanup
   * are swallowed.
   */
  private void cleanupStagingDir() {
    String yarnJobStagingDirectory = yarnConfig.getYarnJobStagingDirectory();
    if (yarnJobStagingDirectory != null) {
      JobContext context = new JobContext();
      context.setAppStagingDir(new Path(yarnJobStagingDirectory));

      FileSystem fs = null;
      try {
        fs = FileSystem.get(yarnConfiguration);
      } catch (IOException e) {
        log.error("Unable to clean up file system.", e);
        return;
      }
      if (fs != null) {
        YarnJobUtil.cleanupStagingDir(context, fs);
      }
    }
  }

  /**
   * Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific
   * ones.
   *
   * @param statuses the YarnContainerStatus callbacks from Yarn.
   */
  @Override
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    List<SamzaResourceStatus> samzaResourceStatuses = new ArrayList<>();

    for (ContainerStatus status : statuses) {
      log.info("Got completion notification for Container ID: {} with status: {} and state: {}. Diagnostics information: {}.",
          status.getContainerId(), status.getExitStatus(), status.getState(), status.getDiagnostics());

      SamzaResourceStatus samzaResourceStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
      samzaResourceStatuses.add(samzaResourceStatus);

      String completedProcessorID = getRunningProcessorId(status.getContainerId().toString());
      log.info("Completed Container ID: {} had Processor ID: {}", status.getContainerId(), completedProcessorID);

      //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
      //failed containers.
      if (!completedProcessorID.equals(INVALID_PROCESSOR_ID)) {
        if (state.runningProcessors.containsKey(completedProcessorID)) {
          log.info("Removing Processor ID: {} from YarnClusterResourceManager running processors.", completedProcessorID);
          state.runningProcessors.remove(completedProcessorID);

          if (status.getExitStatus() != ContainerExitStatus.SUCCESS)
            state.failedContainersStatus.put(status.getContainerId().toString(), status);
        }
      }
    }
    clusterManagerCallback.onResourcesCompleted(samzaResourceStatuses);
  }

  /**
   * Callback invoked from Yarn when containers are allocated. This translates the yarn callbacks into Samza
   * specific ones.
   * @param containers the list of {@link Container} returned by Yarn.
   */
  @Override
  public void onContainersAllocated(List<Container> containers) {
    List<SamzaResource> resources = new ArrayList<SamzaResource>();
    for (Container container : containers) {
      log.info("Got allocation notification for Container ID: {} on host: {}", container.getId(),
          container.getNodeId().getHost());
      String containerId = container.getId().toString();
      String host = container.getNodeId().getHost();
      int memory = container.getResource().getMemory();
      int numCores = container.getResource().getVirtualCores();

      SamzaResource resource = new SamzaResource(numCores, memory, host, containerId);
      allocatedResources.put(resource, container);
      resources.add(resource);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [312:510]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
    String processorId = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID);
    String containerId = resource.getContainerId();
    String host = resource.getHost();
    log.info("Starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host);
    synchronized (lock) {
      try {
        Container container = allocatedResources.get(resource);
        if (container == null) {
          log.info("Container ID: {} on host: {} was already allocated / released.", containerId, host);
          return;
        }

        runProcessor(processorId, container, builder);
      } catch (Throwable t) {
        log.info("Error starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host, t);
        clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
      }
    }
  }

  public void stopStreamProcessor(SamzaResource resource) {
    synchronized (lock) {
      Container container = allocatedResources.get(resource);
      String containerId = resource.getContainerId();
      String containerHost = resource.getHost();
      /*
       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
       *    attempt and managed by the AM due to AM-HA
       * 3. Ignore the request if the container associated with the resource isn't present in the book keeping.
       */
      if (container != null) {
        log.info("Stopping Container ID: {} on host: {}", containerId, containerHost);
        this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      } else {
        YarnContainer yarnContainer = state.runningProcessors.get(getRunningProcessorId(containerId));
        if (yarnContainer != null) {
          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
              containerId, containerHost);
          amClient.releaseAssignedContainer(yarnContainer.id());
        } else {
          log.info("No container with Container ID: {} exists. Ignoring the stop request", containerId);
        }
      }
    }
  }

  /**
   * Given a containerId from Yarn (for example: containerId_app_12345, this method returns the processor ID
   * in the range [0,N-1] that maps to it.
   * @param containerId  the Yarn container ID.
   * @return  the Samza processor ID.
   */
  //TODO: Get rid of the YarnContainer object and just use Container in state.runningProcessors hashmap.
  //In that case, this scan will turn into a lookup. This change will require changes/testing in the UI files because
  //those UI stub templates operate on the YarnContainer object.
  private String getRunningProcessorId(String containerId) {
    for (Map.Entry<String, YarnContainer> entry : state.runningProcessors.entrySet()) {
      String key = entry.getKey();
      YarnContainer yarnContainer = entry.getValue();
      String yarnContainerId = yarnContainer.id().toString();
      if (yarnContainerId.equals(containerId)) {
        return key;
      }
    }
    return INVALID_PROCESSOR_ID;
  }


  /**
   *
   * Remove a previously submitted resource request. The previous container request may have
   * been submitted. Even after the remove request, a Callback implementation must
   * be prepared to receive an allocation for the previous request. This is merely a best effort cancellation.
   *
   * @param request the request to be cancelled
   */
  @Override
  public void cancelResourceRequest(SamzaResourceRequest request) {
    String processorId = request.getProcessorId();
    String preferredHost = request.getPreferredHost();
    String requestId = request.getRequestId();
    log.info("Cancelling resource request for Processor ID: {} on host: {} with Request ID: {}",
        processorId, preferredHost, requestId);
    //ensure that removal and cancellation are done atomically.
    synchronized (lock) {
      AMRMClient.ContainerRequest containerRequest = requestsMap.get(request);
      if (containerRequest == null) {
        log.info("Resource request for Processor ID: {} on host: {} with Request ID: {} already cancelled.",
            processorId, preferredHost, requestId);
        return;
      }
      requestsMap.remove(request);
      amClient.removeContainerRequest(containerRequest);
    }
  }


  /**
   * Stops the YarnContainerManager and all its sub-components.
   * Stop should NOT be called from multiple threads.
   * TODO: fix this to make stop idempotent?.
   */
  @Override
  public void stop(SamzaApplicationState.SamzaAppStatus status) {
    log.info("Stopping the AM client on shutdown request.");
    lifecycle.onShutdown(status);
    amClient.stop();
    log.info("Stopping the NM client on shutdown request.");
    nmClientAsync.stop();
    log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
    service.onShutdown();
    log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
    metrics.stop();

    if (status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
      cleanupStagingDir();
    }
  }

  /**
   * Cleans up the staging directory of the job. All exceptions during the cleanup
   * are swallowed.
   */
  private void cleanupStagingDir() {
    String yarnJobStagingDirectory = yarnConfig.getYarnJobStagingDirectory();
    if (yarnJobStagingDirectory != null) {
      JobContext context = new JobContext();
      context.setAppStagingDir(new Path(yarnJobStagingDirectory));

      FileSystem fs = null;
      try {
        fs = FileSystem.get(yarnConfiguration);
      } catch (IOException e) {
        log.error("Unable to clean up file system.", e);
        return;
      }
      if (fs != null) {
        YarnJobUtil.cleanupStagingDir(context, fs);
      }
    }
  }

  /**
   * Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific
   * ones.
   *
   * @param statuses the YarnContainerStatus callbacks from Yarn.
   */
  @Override
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    List<SamzaResourceStatus> samzaResourceStatuses = new ArrayList<>();

    for (ContainerStatus status : statuses) {
      log.info("Got completion notification for Container ID: {} with status: {} and state: {}. Diagnostics information: {}.",
          status.getContainerId(), status.getExitStatus(), status.getState(), status.getDiagnostics());

      SamzaResourceStatus samzaResourceStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
      samzaResourceStatuses.add(samzaResourceStatus);

      String completedProcessorID = getRunningProcessorId(status.getContainerId().toString());
      log.info("Completed Container ID: {} had Processor ID: {}", status.getContainerId(), completedProcessorID);

      //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
      //failed containers.
      if (!completedProcessorID.equals(INVALID_PROCESSOR_ID)) {
        if (state.runningProcessors.containsKey(completedProcessorID)) {
          log.info("Removing Processor ID: {} from YarnClusterResourceManager running processors.", completedProcessorID);
          state.runningProcessors.remove(completedProcessorID);

          if (status.getExitStatus() != ContainerExitStatus.SUCCESS)
            state.failedContainersStatus.put(status.getContainerId().toString(), status);
        }
      }
    }
    clusterManagerCallback.onResourcesCompleted(samzaResourceStatuses);
  }

  /**
   * Callback invoked from Yarn when containers are allocated. This translates the yarn callbacks into Samza
   * specific ones.
   * @param containers the list of {@link Container} returned by Yarn.
   */
  @Override
  public void onContainersAllocated(List<Container> containers) {
    List<SamzaResource> resources = new ArrayList<SamzaResource>();
    for (Container container : containers) {
      log.info("Got allocation notification for Container ID: {} on host: {}", container.getId(),
          container.getNodeId().getHost());
      String containerId = container.getId().toString();
      String host = container.getNodeId().getHost();
      int memory = container.getResource().getMemory();
      int numCores = container.getResource().getVirtualCores();

      SamzaResource resource = new SamzaResource(numCores, memory, host, containerId);
      allocatedResources.put(resource, container);
      resources.add(resource);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



