samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [645:709]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void startContainer(Path packagePath,
                              Container container,
                              Map<String, String> env,
                              final String cmd) throws IOException {
    LocalResource packageResource = Records.newRecord(LocalResource.class);
    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
    FileStatus fileStatus;
    fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
    packageResource.setResource(packageUrl);
    log.debug("Set package resource in YarnContainerRunner for {}", packageUrl);
    packageResource.setSize(fileStatus.getLen());
    packageResource.setTimestamp(fileStatus.getModificationTime());
    packageResource.setType(LocalResourceType.ARCHIVE);
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

    ByteBuffer allTokens;
    // copy tokens to start the container
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);

    // now remove the AM->RM token so that containers cannot access it
    Iterator iter = credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      TokenIdentifier token = ((org.apache.hadoop.security.token.Token) iter.next()).decodeIdentifier();
      if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    Map<String, LocalResource> localResourceMap = new HashMap<>();
    localResourceMap.put("__package", packageResource);

    // include the resources from the universal resource configurations
    LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
    localResourceMap.putAll(resourceMapper.getResourceMap());

    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
    context.setEnvironment(env);
    context.setTokens(allTokens.duplicate());
    context.setCommands(new ArrayList<String>() {
      {
        add(cmd);
      }
    });
    context.setLocalResources(localResourceMap);

    if (UserGroupInformation.isSecurityEnabled()) {
      Map<ApplicationAccessType, String> acls = yarnConfig.getYarnApplicationAcls();
      if (!acls.isEmpty()) {
        context.setApplicationACLs(acls);
      }
    }

    log.debug("Setting localResourceMap to {}", localResourceMap);
    log.debug("Setting context to {}", context);

    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
    startContainerRequest.setContainerLaunchContext(context);

    log.info("Making an async start request for Container ID: {} on host: {} with local resource map: {} and context: {}",
        container.getId(), container.getNodeHttpAddress(), localResourceMap.toString(), context);
    nmClientAsync.startContainerAsync(container, context);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [643:707]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void startContainer(Path packagePath,
                              Container container,
                              Map<String, String> env,
                              final String cmd) throws IOException {
    LocalResource packageResource = Records.newRecord(LocalResource.class);
    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
    FileStatus fileStatus;
    fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
    packageResource.setResource(packageUrl);
    log.debug("Set package resource in YarnContainerRunner for {}", packageUrl);
    packageResource.setSize(fileStatus.getLen());
    packageResource.setTimestamp(fileStatus.getModificationTime());
    packageResource.setType(LocalResourceType.ARCHIVE);
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

    ByteBuffer allTokens;
    // copy tokens to start the container
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);

    // now remove the AM->RM token so that containers cannot access it
    Iterator iter = credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      TokenIdentifier token = ((org.apache.hadoop.security.token.Token) iter.next()).decodeIdentifier();
      if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    Map<String, LocalResource> localResourceMap = new HashMap<>();
    localResourceMap.put("__package", packageResource);

    // include the resources from the universal resource configurations
    LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
    localResourceMap.putAll(resourceMapper.getResourceMap());

    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
    context.setEnvironment(env);
    context.setTokens(allTokens.duplicate());
    context.setCommands(new ArrayList<String>() {
      {
        add(cmd);
      }
    });
    context.setLocalResources(localResourceMap);

    if (UserGroupInformation.isSecurityEnabled()) {
      Map<ApplicationAccessType, String> acls = yarnConfig.getYarnApplicationAcls();
      if (!acls.isEmpty()) {
        context.setApplicationACLs(acls);
      }
    }

    log.debug("Setting localResourceMap to {}", localResourceMap);
    log.debug("Setting context to {}", context);

    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
    startContainerRequest.setContainerLaunchContext(context);

    log.info("Making an async start request for Container ID: {} on host: {} with local resource map: {} and context: {}",
        container.getId(), container.getNodeHttpAddress(), localResourceMap.toString(), context);
    nmClientAsync.startContainerAsync(container, context);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



