samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [314:333]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
    String processorId = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID);
    String containerId = resource.getContainerId();
    String host = resource.getHost();
    log.info("Starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host);
    synchronized (lock) {
      try {
        Container container = allocatedResources.get(resource);
        if (container == null) {
          log.info("Container ID: {} on host: {} was already allocated / released.", containerId, host);
          return;
        }

        runProcessor(processorId, container, builder);
      } catch (Throwable t) {
        log.info("Error starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host, t);
        clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [313:332]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
    String processorId = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID);
    String containerId = resource.getContainerId();
    String host = resource.getHost();
    log.info("Starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host);
    synchronized (lock) {
      try {
        Container container = allocatedResources.get(resource);
        if (container == null) {
          log.info("Container ID: {} on host: {} was already allocated / released.", containerId, host);
          return;
        }

        runProcessor(processorId, container, builder);
      } catch (Throwable t) {
        log.info("Error starting Processor ID: {} on Container ID: {} on host: {}", processorId, containerId, host, t);
        clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



