samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [514:774]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    clusterManagerCallback.onResourcesAvailable(resources);
  }

  //The below methods are specific to the Yarn AMRM Client. We currently don't handle scenarios where there are
  //nodes being updated. We always return 0 when asked for progress by Yarn.
  @Override
  public void onShutdownRequest() {
    stop(SamzaApplicationState.SamzaAppStatus.FAILED);
  }

  @Override
  public void onNodesUpdated(List<NodeReport> updatedNodes) {
    //not implemented currently.
  }

  @Override
  public float getProgress() {
    //not implemented currently.
    return 0;
  }

  /**
   * Callback invoked when there is an error in the Yarn client. This delegates the callback handling to
   * the {@link org.apache.samza.clustermanager.ClusterResourceManager.Callback} instance.
   *
   */
  @Override
  public void onError(Throwable e) {
    log.error("Exception in the Yarn callback", e);
    clusterManagerCallback.onError(e);
  }

  @Override
  public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
    handleOnContainerStarted(containerId);
  }

  @Override
  public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
    log.info("Got status notification for Container ID: {} for Processor ID: {}. Status: {}",
        containerId, getRunningProcessorId(containerId.toString()), containerStatus.getState());
  }

  @Override
  public void onContainerStopped(ContainerId containerId) {
    log.info("Got stop notification for Container ID: {} for Processor ID: {}",
        containerId, getRunningProcessorId(containerId.toString()));
  }

  @Override
  public void onStartContainerError(ContainerId containerId, Throwable t) {
    String processorId = getPendingProcessorId(containerId);

    if (processorId != null) {
      log.info("Got start error notification for Container ID: {} for Processor ID: {} ", containerId, processorId, t);
      YarnContainer container = state.pendingProcessors.remove(processorId);
      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
      clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new ProcessorLaunchException(t));
    } else {
      log.warn("Did not find the pending Processor ID for the start error notification for Container ID: {}. " +
          "Ignoring notification", containerId);
    }
  }

  @Override
  public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
    log.info("Got status error notification for Container ID: {} for Processor ID: {}",
        containerId, getRunningProcessorId(containerId.toString()), t);
  }

  @Override
  public void onStopContainerError(ContainerId containerId, Throwable t) {
    String processorId = getRunningProcessorId(containerId.toString());

    if (processorId != null) {
      log.info("Got stop error notification for Container ID: {} for Processor ID: {}", containerId, processorId, t);
      YarnContainer container = state.runningProcessors.get(processorId);
      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
      clusterManagerCallback.onStreamProcessorStopFailure(resource, t);
    } else {
      log.warn("Did not find the running Processor ID for the stop error notification for Container ID: {}. " +
          "Ignoring notification", containerId);
    }
  }

  @Override
  public boolean isResourceExpired(SamzaResource resource) {
    // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec (to account for clock skew)
    Duration yarnAllocatedResourceExpiry =
        Duration.ofMillis(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
            .minus(Duration.ofSeconds(30));
    return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
  }

  /**
   * Runs a process as specified by the command builder on the container.
   * @param processorId id of the samza processor to run (passed as a command line parameter to the process)
   * @param container the yarn container to run the processor on.
   * @param cmdBuilder the command builder that encapsulates the command, and the context
   * @throws IOException on IO exceptions running the container
   */
  public void runProcessor(String processorId, Container container, CommandBuilder cmdBuilder) throws IOException {
    String containerIdStr = ConverterUtils.toString(container.getId());
    String cmdPath = "./__package/";
    cmdBuilder.setCommandPath(cmdPath);
    String command = cmdBuilder.buildCommand();

    Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
    env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID, Util.envVarEscape(container.getId().toString()));

    Path packagePath = new Path(yarnConfig.getPackagePath());
    String formattedCommand =
        getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT,
            ApplicationConstants.STDERR);

    log.info("Running Processor ID: {} on Container ID: {} on host: {} using command: {} and env: {} and package path: {}",
        processorId, containerIdStr, container.getNodeHttpAddress(), formattedCommand, env, packagePath);
    state.pendingProcessors.put(processorId, new YarnContainer(container));

    startContainer(packagePath, container, env, formattedCommand);

    log.info("Made start request for Processor ID: {} on Container ID: {} on host: {} (http://{}/node/containerlogs/{}).",
        processorId, containerIdStr, container.getNodeId().getHost(), container.getNodeHttpAddress(), containerIdStr);
  }

  /**
   * Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL
   * specified by packagePath.
   */
  private void startContainer(Path packagePath,
                              Container container,
                              Map<String, String> env,
                              final String cmd) throws IOException {
    LocalResource packageResource = Records.newRecord(LocalResource.class);
    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
    FileStatus fileStatus;
    fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
    packageResource.setResource(packageUrl);
    log.debug("Set package resource in YarnContainerRunner for {}", packageUrl);
    packageResource.setSize(fileStatus.getLen());
    packageResource.setTimestamp(fileStatus.getModificationTime());
    packageResource.setType(LocalResourceType.ARCHIVE);
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

    ByteBuffer allTokens;
    // copy tokens to start the container
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);

    // now remove the AM->RM token so that containers cannot access it
    Iterator iter = credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      TokenIdentifier token = ((org.apache.hadoop.security.token.Token) iter.next()).decodeIdentifier();
      if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    Map<String, LocalResource> localResourceMap = new HashMap<>();
    localResourceMap.put("__package", packageResource);

    // include the resources from the universal resource configurations
    LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
    localResourceMap.putAll(resourceMapper.getResourceMap());

    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
    context.setEnvironment(env);
    context.setTokens(allTokens.duplicate());
    context.setCommands(new ArrayList<String>() {
      {
        add(cmd);
      }
    });
    context.setLocalResources(localResourceMap);

    if (UserGroupInformation.isSecurityEnabled()) {
      Map<ApplicationAccessType, String> acls = yarnConfig.getYarnApplicationAcls();
      if (!acls.isEmpty()) {
        context.setApplicationACLs(acls);
      }
    }

    log.debug("Setting localResourceMap to {}", localResourceMap);
    log.debug("Setting context to {}", context);

    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
    startContainerRequest.setContainerLaunchContext(context);

    log.info("Making an async start request for Container ID: {} on host: {} with local resource map: {} and context: {}",
        container.getId(), container.getNodeHttpAddress(), localResourceMap.toString(), context);
    nmClientAsync.startContainerAsync(container, context);
  }

  /**
   * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
   *
   * @param cmdBuilder        the command builder containing the environment variables.
   * @return                  the map containing the escaped environment variables.
   */
  private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
    Map<String, String> env = new HashMap<String, String>();
    for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
      String escapedValue = Util.envVarEscape(entry.getValue());
      env.put(entry.getKey(), escapedValue);
    }
    return env;
  }


  private String getFormattedCommand(String logDirExpansionVar, String command, String stdOut, String stdErr) {
    return String.format("export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s",
        logDirExpansionVar, logDirExpansionVar, command, stdOut, stdErr);
  }

  /**
   * Returns the Id of the Samza container that corresponds to the provided Yarn {@link ContainerId}
   * @param containerId the Yarn ContainerId
   * @return the id of the Samza container corresponding to the {@link ContainerId} that is pending launch
   */
  private String getPendingProcessorId(ContainerId containerId) {
    for (String pendingProcessorId: state.pendingProcessors.keySet()) {
      YarnContainer yarnContainer = state.pendingProcessors.get(pendingProcessorId);
      if (yarnContainer != null && yarnContainer.id().equals(containerId)) {
        return pendingProcessorId;
      }
    }
    return null;
  }

  /**
   * Handles container started call back for a yarn container.
   * updates the YarnAppState's pendingProcessors and runningProcessors
   * and also invokes clusterManagerCallback.s stream processor launch success
   * @param containerId yarn container id which has started
   */
  private void handleOnContainerStarted(ContainerId containerId) {
    String processorId = getPendingProcessorId(containerId);
    if (processorId != null) {
      log.info("Got start notification for Container ID: {} for Processor ID: {}", containerId, processorId);
      // 1. Move the processor from pending to running state
      final YarnContainer container = state.pendingProcessors.remove(processorId);

      state.runningProcessors.put(processorId, container);

      // 2. Invoke the success callback.
      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
      clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
    } else {
      log.warn("Did not find the Processor ID for the start notification for Container ID: {}. " +
          "Ignoring notification.", containerId);
    }
  }

  @VisibleForTesting
  ConcurrentHashMap<SamzaResource, Container> getAllocatedResources() {
    return allocatedResources;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [512:772]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    clusterManagerCallback.onResourcesAvailable(resources);
  }

  //The below methods are specific to the Yarn AMRM Client. We currently don't handle scenarios where there are
  //nodes being updated. We always return 0 when asked for progress by Yarn.
  @Override
  public void onShutdownRequest() {
    stop(SamzaApplicationState.SamzaAppStatus.FAILED);
  }

  @Override
  public void onNodesUpdated(List<NodeReport> updatedNodes) {
    //not implemented currently.
  }

  @Override
  public float getProgress() {
    //not implemented currently.
    return 0;
  }

  /**
   * Callback invoked when there is an error in the Yarn client. This delegates the callback handling to
   * the {@link org.apache.samza.clustermanager.ClusterResourceManager.Callback} instance.
   *
   */
  @Override
  public void onError(Throwable e) {
    log.error("Exception in the Yarn callback", e);
    clusterManagerCallback.onError(e);
  }

  @Override
  public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
    handleOnContainerStarted(containerId);
  }

  @Override
  public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
    log.info("Got status notification for Container ID: {} for Processor ID: {}. Status: {}",
        containerId, getRunningProcessorId(containerId.toString()), containerStatus.getState());
  }

  @Override
  public void onContainerStopped(ContainerId containerId) {
    log.info("Got stop notification for Container ID: {} for Processor ID: {}",
        containerId, getRunningProcessorId(containerId.toString()));
  }

  @Override
  public void onStartContainerError(ContainerId containerId, Throwable t) {
    String processorId = getPendingProcessorId(containerId);

    if (processorId != null) {
      log.info("Got start error notification for Container ID: {} for Processor ID: {} ", containerId, processorId, t);
      YarnContainer container = state.pendingProcessors.remove(processorId);
      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
      clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new ProcessorLaunchException(t));
    } else {
      log.warn("Did not find the pending Processor ID for the start error notification for Container ID: {}. " +
          "Ignoring notification", containerId);
    }
  }

  @Override
  public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
    log.info("Got status error notification for Container ID: {} for Processor ID: {}",
        containerId, getRunningProcessorId(containerId.toString()), t);
  }

  @Override
  public void onStopContainerError(ContainerId containerId, Throwable t) {
    String processorId = getRunningProcessorId(containerId.toString());

    if (processorId != null) {
      log.info("Got stop error notification for Container ID: {} for Processor ID: {}", containerId, processorId, t);
      YarnContainer container = state.runningProcessors.get(processorId);
      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
      clusterManagerCallback.onStreamProcessorStopFailure(resource, t);
    } else {
      log.warn("Did not find the running Processor ID for the stop error notification for Container ID: {}. " +
          "Ignoring notification", containerId);
    }
  }

  @Override
  public boolean isResourceExpired(SamzaResource resource) {
    // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec (to account for clock skew)
    Duration yarnAllocatedResourceExpiry =
        Duration.ofMillis(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
            .minus(Duration.ofSeconds(30));
    return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
  }

  /**
   * Runs a process as specified by the command builder on the container.
   * @param processorId id of the samza processor to run (passed as a command line parameter to the process)
   * @param container the yarn container to run the processor on.
   * @param cmdBuilder the command builder that encapsulates the command, and the context
   * @throws IOException on IO exceptions running the container
   */
  public void runProcessor(String processorId, Container container, CommandBuilder cmdBuilder) throws IOException {
    String containerIdStr = ConverterUtils.toString(container.getId());
    String cmdPath = "./__package/";
    cmdBuilder.setCommandPath(cmdPath);
    String command = cmdBuilder.buildCommand();

    Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
    env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID, Util.envVarEscape(container.getId().toString()));

    Path packagePath = new Path(yarnConfig.getPackagePath());
    String formattedCommand =
        getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT,
            ApplicationConstants.STDERR);

    log.info("Running Processor ID: {} on Container ID: {} on host: {} using command: {} and env: {} and package path: {}",
        processorId, containerIdStr, container.getNodeHttpAddress(), formattedCommand, env, packagePath);
    state.pendingProcessors.put(processorId, new YarnContainer(container));

    startContainer(packagePath, container, env, formattedCommand);

    log.info("Made start request for Processor ID: {} on Container ID: {} on host: {} (http://{}/node/containerlogs/{}).",
        processorId, containerIdStr, container.getNodeId().getHost(), container.getNodeHttpAddress(), containerIdStr);
  }

  /**
   * Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL
   * specified by packagePath.
   */
  private void startContainer(Path packagePath,
                              Container container,
                              Map<String, String> env,
                              final String cmd) throws IOException {
    LocalResource packageResource = Records.newRecord(LocalResource.class);
    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
    FileStatus fileStatus;
    fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
    packageResource.setResource(packageUrl);
    log.debug("Set package resource in YarnContainerRunner for {}", packageUrl);
    packageResource.setSize(fileStatus.getLen());
    packageResource.setTimestamp(fileStatus.getModificationTime());
    packageResource.setType(LocalResourceType.ARCHIVE);
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

    ByteBuffer allTokens;
    // copy tokens to start the container
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);

    // now remove the AM->RM token so that containers cannot access it
    Iterator iter = credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      TokenIdentifier token = ((org.apache.hadoop.security.token.Token) iter.next()).decodeIdentifier();
      if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    Map<String, LocalResource> localResourceMap = new HashMap<>();
    localResourceMap.put("__package", packageResource);

    // include the resources from the universal resource configurations
    LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
    localResourceMap.putAll(resourceMapper.getResourceMap());

    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
    context.setEnvironment(env);
    context.setTokens(allTokens.duplicate());
    context.setCommands(new ArrayList<String>() {
      {
        add(cmd);
      }
    });
    context.setLocalResources(localResourceMap);

    if (UserGroupInformation.isSecurityEnabled()) {
      Map<ApplicationAccessType, String> acls = yarnConfig.getYarnApplicationAcls();
      if (!acls.isEmpty()) {
        context.setApplicationACLs(acls);
      }
    }

    log.debug("Setting localResourceMap to {}", localResourceMap);
    log.debug("Setting context to {}", context);

    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
    startContainerRequest.setContainerLaunchContext(context);

    log.info("Making an async start request for Container ID: {} on host: {} with local resource map: {} and context: {}",
        container.getId(), container.getNodeHttpAddress(), localResourceMap.toString(), context);
    nmClientAsync.startContainerAsync(container, context);
  }

  /**
   * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
   *
   * @param cmdBuilder        the command builder containing the environment variables.
   * @return                  the map containing the escaped environment variables.
   */
  private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
    Map<String, String> env = new HashMap<String, String>();
    for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
      String escapedValue = Util.envVarEscape(entry.getValue());
      env.put(entry.getKey(), escapedValue);
    }
    return env;
  }


  private String getFormattedCommand(String logDirExpansionVar, String command, String stdOut, String stdErr) {
    return String.format("export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s",
        logDirExpansionVar, logDirExpansionVar, command, stdOut, stdErr);
  }

  /**
   * Returns the Id of the Samza container that corresponds to the provided Yarn {@link ContainerId}
   * @param containerId the Yarn ContainerId
   * @return the id of the Samza container corresponding to the {@link ContainerId} that is pending launch
   */
  private String getPendingProcessorId(ContainerId containerId) {
    for (String pendingProcessorId: state.pendingProcessors.keySet()) {
      YarnContainer yarnContainer = state.pendingProcessors.get(pendingProcessorId);
      if (yarnContainer != null && yarnContainer.id().equals(containerId)) {
        return pendingProcessorId;
      }
    }
    return null;
  }

  /**
   * Handles container started call back for a yarn container.
   * updates the YarnAppState's pendingProcessors and runningProcessors
   * and also invokes clusterManagerCallback.s stream processor launch success
   * @param containerId yarn container id which has started
   */
  private void handleOnContainerStarted(ContainerId containerId) {
    String processorId = getPendingProcessorId(containerId);
    if (processorId != null) {
      log.info("Got start notification for Container ID: {} for Processor ID: {}", containerId, processorId);
      // 1. Move the processor from pending to running state
      final YarnContainer container = state.pendingProcessors.remove(processorId);

      state.runningProcessors.put(processorId, container);

      // 2. Invoke the success callback.
      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
      clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
    } else {
      log.warn("Did not find the Processor ID for the start notification for Container ID: {}. " +
          "Ignoring notification.", containerId);
    }
  }

  @VisibleForTesting
  ConcurrentHashMap<SamzaResource, Container> getAllocatedResources() {
    return allocatedResources;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



