samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [335:360]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void stopStreamProcessor(SamzaResource resource) {
    synchronized (lock) {
      Container container = allocatedResources.get(resource);
      String containerId = resource.getContainerId();
      String containerHost = resource.getHost();
      /*
       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
       *    attempt and managed by the AM due to AM-HA
       * 3. Ignore the request if the container associated with the resource isn't present in the book keeping.
       */
      if (container != null) {
        log.info("Stopping Container ID: {} on host: {}", containerId, containerHost);
        this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      } else {
        YarnContainer yarnContainer = state.runningProcessors.get(getRunningProcessorId(containerId));
        if (yarnContainer != null) {
          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
              containerId, containerHost);
          amClient.releaseAssignedContainer(yarnContainer.id());
        } else {
          log.info("No container with Container ID: {} exists. Ignoring the stop request", containerId);
        }
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [334:359]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void stopStreamProcessor(SamzaResource resource) {
    synchronized (lock) {
      Container container = allocatedResources.get(resource);
      String containerId = resource.getContainerId();
      String containerHost = resource.getHost();
      /*
       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
       *    attempt and managed by the AM due to AM-HA
       * 3. Ignore the request if the container associated with the resource isn't present in the book keeping.
       */
      if (container != null) {
        log.info("Stopping Container ID: {} on host: {}", containerId, containerHost);
        this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      } else {
        YarnContainer yarnContainer = state.runningProcessors.get(getRunningProcessorId(containerId));
        if (yarnContainer != null) {
          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
              containerId, containerHost);
          amClient.releaseAssignedContainer(yarnContainer.id());
        } else {
          log.info("No container with Container ID: {} exists. Ignoring the stop request", containerId);
        }
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



