samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [241:281]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void requestResources(SamzaResourceRequest resourceRequest) {
    String processorId = resourceRequest.getProcessorId();
    String requestId = resourceRequest.getRequestId();
    String preferredHost = resourceRequest.getPreferredHost();
    String[] racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
    int memoryMb = resourceRequest.getMemoryMB();
    int cpuCores = resourceRequest.getNumCores();
    Resource capability = Resource.newInstance(memoryMb, cpuCores);
    String nodeLabelsExpression = yarnConfig.getContainerLabel();

    AMRMClient.ContainerRequest issuedRequest;

    /*
     * Yarn enforces these two checks:
     *   1. ANY_HOST requests should always be made with relax-locality = true
     *   2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true
     *
     * Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests
     * should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than
     * any-host requests since data-locality is critical.
     */
    if (preferredHost.equals("ANY_HOST")) {
      Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
      boolean relaxLocality = true;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
    } else {
      String[] nodes = {preferredHost};
      Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
      boolean relaxLocality = false;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression);
    }
    // ensure that updating the state and making the request are done atomically.
    synchronized (lock) {
      requestsMap.put(resourceRequest, issuedRequest);
      amClient.addContainerRequest(issuedRequest);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [241:281]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void requestResources(SamzaResourceRequest resourceRequest) {
    String processorId = resourceRequest.getProcessorId();
    String requestId = resourceRequest.getRequestId();
    String preferredHost = resourceRequest.getPreferredHost();
    String[] racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
    int memoryMb = resourceRequest.getMemoryMB();
    int cpuCores = resourceRequest.getNumCores();
    Resource capability = Resource.newInstance(memoryMb, cpuCores);
    String nodeLabelsExpression = yarnConfig.getContainerLabel();

    AMRMClient.ContainerRequest issuedRequest;

    /*
     * Yarn enforces these two checks:
     *   1. ANY_HOST requests should always be made with relax-locality = true
     *   2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true
     *
     * Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests
     * should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than
     * any-host requests since data-locality is critical.
     */
    if (preferredHost.equals("ANY_HOST")) {
      Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
      boolean relaxLocality = true;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
    } else {
      String[] nodes = {preferredHost};
      Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
      boolean relaxLocality = false;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression);
    }
    // ensure that updating the state and making the request are done atomically.
    synchronized (lock) {
      requestsMap.put(resourceRequest, issuedRequest);
      amClient.addContainerRequest(issuedRequest);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



