samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [82:300]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {

  private static final int PREFERRED_HOST_PRIORITY = 0;
  private static final int ANY_HOST_PRIORITY = 1;

  private static final String INVALID_PROCESSOR_ID = "-1";

  /**
   * The AMClient instance to request resources from yarn.
   */
  private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;

  /**
   * Configuration and state specific to Yarn.
   */
  private final YarnConfiguration yarnConfiguration;
  private final YarnAppState state;

  /**
   * SamzaYarnAppMasterLifecycle is responsible for registering, unregistering the AM client.
   */
  private final SamzaYarnAppMasterLifecycle lifecycle;

  /**
   * SamzaAppMasterService is responsible for hosting an AM web UI. This picks up data from both
   * SamzaAppState and YarnAppState.
   */
  private final SamzaYarnAppMasterService service;

  private final YarnConfig yarnConfig;

  /**
   * State variables to map Yarn specific callbacks into Samza specific callbacks.
   */
  private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>();
  private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();

  private final SamzaAppMasterMetrics metrics;

  private final AtomicBoolean started = new AtomicBoolean(false);
  private final Object lock = new Object();
  private final NMClientAsync nmClientAsync;

  private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
  private final Config config;

  YarnClusterResourceManager(AMRMClientAsync amClientAsync, NMClientAsync nmClientAsync, Callback callback,
      YarnAppState yarnAppState, SamzaYarnAppMasterLifecycle lifecycle, SamzaYarnAppMasterService service,
      SamzaAppMasterMetrics metrics, YarnConfiguration yarnConfiguration, Config config) {
    super(callback);
    this.yarnConfiguration  = yarnConfiguration;
    this.metrics = metrics;
    this.yarnConfig = new YarnConfig(config);
    this.config = config;
    this.amClient = amClientAsync;
    this.state = yarnAppState;
    this.lifecycle = lifecycle;
    this.service = service;
    this.nmClientAsync = nmClientAsync;
  }

  /**
   * Creates an YarnClusterResourceManager from config, a jobModelReader and a callback.
   * @param config to instantiate the cluster manager with
   * @param jobModelManager the jobModel manager to get the job model (mostly for the UI)
   * @param callback the callback to receive events from Yarn.
   * @param samzaAppState samza app state for display in the UI
   */
  public YarnClusterResourceManager(Config config, JobModelManager jobModelManager,
      ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState) {
    super(callback);
    yarnConfiguration = new YarnConfiguration();
    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());

    // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration
    FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
    fsImplConfig.getSchemes().forEach(
      scheme -> {
        fsImplConfig.getSchemeConfig(scheme).forEach(
          (confKey, confValue) -> yarnConfiguration.set(confKey, confValue)
        );
      }
    );

    MetricsRegistryMap registry = new MetricsRegistryMap();
    metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);

    // parse configs from the Yarn environment
    String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
    String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
    String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
    String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());

    int nodePort = Integer.parseInt(nodePortString);
    int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
    YarnConfig yarnConfig = new YarnConfig(config);
    this.yarnConfig = yarnConfig;
    this.config = config;
    int interval = yarnConfig.getAMPollIntervalMs();

    //Instantiate the AM Client.
    this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);

    this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);

    log.info("Initialized YarnAppState: {}", state.toString());
    this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, yarnConfiguration);

    log.info("Container ID: {}, Nodehost:  {} , Nodeport : {} , NodeHttpport: {}", containerIdStr, nodeHostString, nodePort, nodeHttpPort);
    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
    this.lifecycle = new SamzaYarnAppMasterLifecycle(
        clusterManagerConfig.getContainerMemoryMb(),
        clusterManagerConfig.getNumCores(),
        samzaAppState,
        state,
        amClient,
        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()
    );
    this.nmClientAsync = NMClientAsync.createNMClientAsync(this);

  }

  /**
   * Starts the YarnClusterResourceManager and initialize all its sub-systems.
   * Attempting to start an already started cluster manager will return immediately.
   */
  @Override
  public void start() {
    if (!started.compareAndSet(false, true)) {
      log.info("Attempting to start an already started YarnClusterResourceManager");
      return;
    }
    metrics.start();
    service.onInit();
    log.info("Starting YarnClusterResourceManager.");
    amClient.init(yarnConfiguration);
    amClient.start();
    nmClientAsync.init(yarnConfiguration);
    nmClientAsync.start();
    Set<ContainerId> previousAttemptsContainers = lifecycle.onInit();
    metrics.setContainersFromPreviousAttempts(previousAttemptsContainers.size());

    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
      log.info("Received running containers from previous attempt. Invoking launch success for them.");
      previousAttemptsContainers.forEach(this::handleOnContainerStarted);
    }

    if (lifecycle.shouldShutdown()) {
      clusterManagerCallback.onError(new SamzaException("Invalid resource request."));
    }

    log.info("Finished starting YarnClusterResourceManager");
  }

  /**
   * Request resources for running container processes.
   */
  @Override
  public void requestResources(SamzaResourceRequest resourceRequest) {
    String processorId = resourceRequest.getProcessorId();
    String requestId = resourceRequest.getRequestId();
    String preferredHost = resourceRequest.getPreferredHost();
    String[] racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
    int memoryMb = resourceRequest.getMemoryMB();
    int cpuCores = resourceRequest.getNumCores();
    Resource capability = Resource.newInstance(memoryMb, cpuCores);
    String nodeLabelsExpression = yarnConfig.getContainerLabel();

    AMRMClient.ContainerRequest issuedRequest;

    /*
     * Yarn enforces these two checks:
     *   1. ANY_HOST requests should always be made with relax-locality = true
     *   2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true
     *
     * Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests
     * should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than
     * any-host requests since data-locality is critical.
     */
    if (preferredHost.equals("ANY_HOST")) {
      Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
      boolean relaxLocality = true;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
    } else {
      String[] nodes = {preferredHost};
      Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
      boolean relaxLocality = false;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression);
    }
    // ensure that updating the state and making the request are done atomically.
    synchronized (lock) {
      requestsMap.put(resourceRequest, issuedRequest);
      amClient.addContainerRequest(issuedRequest);
    }
  }

  /**
   * Requests the YarnContainerManager to release a resource. If the app cannot use the resource or wants to give up
   * the resource, it can release them.
   *
   * @param resource to be released
   */
  @Override
  public void releaseResources(SamzaResource resource) {
    log.info("Releasing Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
    // ensure that updating state and removing the request are done atomically
    synchronized (lock) {
      Container container = allocatedResources.get(resource);
      if (container == null) {
        log.info("Container ID: {} on host: {} was already released.", resource.getContainerId(), resource.getHost());
        return;
      }
      amClient.releaseAssignedContainer(container.getId());
      allocatedResources.remove(resource);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [82:300]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {

  private static final int PREFERRED_HOST_PRIORITY = 0;
  private static final int ANY_HOST_PRIORITY = 1;

  private static final String INVALID_PROCESSOR_ID = "-1";

  /**
   * The AMClient instance to request resources from yarn.
   */
  private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;

  /**
   * Configuration and state specific to Yarn.
   */
  private final YarnConfiguration yarnConfiguration;
  private final YarnAppState state;

  /**
   * SamzaYarnAppMasterLifecycle is responsible for registering, unregistering the AM client.
   */
  private final SamzaYarnAppMasterLifecycle lifecycle;

  /**
   * SamzaAppMasterService is responsible for hosting an AM web UI. This picks up data from both
   * SamzaAppState and YarnAppState.
   */
  private final SamzaYarnAppMasterService service;

  private final YarnConfig yarnConfig;

  /**
   * State variables to map Yarn specific callbacks into Samza specific callbacks.
   */
  private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>();
  private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();

  private final SamzaAppMasterMetrics metrics;

  private final AtomicBoolean started = new AtomicBoolean(false);
  private final Object lock = new Object();
  private final NMClientAsync nmClientAsync;

  private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
  private final Config config;

  YarnClusterResourceManager(AMRMClientAsync amClientAsync, NMClientAsync nmClientAsync, Callback callback,
      YarnAppState yarnAppState, SamzaYarnAppMasterLifecycle lifecycle, SamzaYarnAppMasterService service,
      SamzaAppMasterMetrics metrics, YarnConfiguration yarnConfiguration, Config config) {
    super(callback);
    this.yarnConfiguration  = yarnConfiguration;
    this.metrics = metrics;
    this.yarnConfig = new YarnConfig(config);
    this.config = config;
    this.amClient = amClientAsync;
    this.state = yarnAppState;
    this.lifecycle = lifecycle;
    this.service = service;
    this.nmClientAsync = nmClientAsync;
  }

  /**
   * Creates an YarnClusterResourceManager from config, a jobModelReader and a callback.
   * @param config to instantiate the cluster manager with
   * @param jobModelManager the jobModel manager to get the job model (mostly for the UI)
   * @param callback the callback to receive events from Yarn.
   * @param samzaAppState samza app state for display in the UI
   */
  public YarnClusterResourceManager(Config config, JobModelManager jobModelManager,
      ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState) {
    super(callback);
    yarnConfiguration = new YarnConfiguration();
    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());

    // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration
    FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
    fsImplConfig.getSchemes().forEach(
      scheme -> {
        fsImplConfig.getSchemeConfig(scheme).forEach(
          (confKey, confValue) -> yarnConfiguration.set(confKey, confValue)
        );
      }
    );

    MetricsRegistryMap registry = new MetricsRegistryMap();
    metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);

    // parse configs from the Yarn environment
    String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
    String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
    String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
    String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());

    int nodePort = Integer.parseInt(nodePortString);
    int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
    YarnConfig yarnConfig = new YarnConfig(config);
    this.yarnConfig = yarnConfig;
    this.config = config;
    int interval = yarnConfig.getAMPollIntervalMs();

    //Instantiate the AM Client.
    this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);

    this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);

    log.info("Initialized YarnAppState: {}", state.toString());
    this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, yarnConfiguration);

    log.info("Container ID: {}, Nodehost:  {} , Nodeport : {} , NodeHttpport: {}", containerIdStr, nodeHostString, nodePort, nodeHttpPort);
    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
    this.lifecycle = new SamzaYarnAppMasterLifecycle(
        clusterManagerConfig.getContainerMemoryMb(),
        clusterManagerConfig.getNumCores(),
        samzaAppState,
        state,
        amClient,
        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()
    );
    this.nmClientAsync = NMClientAsync.createNMClientAsync(this);

  }

  /**
   * Starts the YarnClusterResourceManager and initialize all its sub-systems.
   * Attempting to start an already started cluster manager will return immediately.
   */
  @Override
  public void start() {
    if (!started.compareAndSet(false, true)) {
      log.info("Attempting to start an already started YarnClusterResourceManager");
      return;
    }
    metrics.start();
    service.onInit();
    log.info("Starting YarnClusterResourceManager.");
    amClient.init(yarnConfiguration);
    amClient.start();
    nmClientAsync.init(yarnConfiguration);
    nmClientAsync.start();
    Set<ContainerId> previousAttemptsContainers = lifecycle.onInit();
    metrics.setContainersFromPreviousAttempts(previousAttemptsContainers.size());

    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
      log.info("Received running containers from previous attempt. Invoking launch success for them.");
      previousAttemptsContainers.forEach(this::handleOnContainerStarted);
    }

    if (lifecycle.shouldShutdown()) {
      clusterManagerCallback.onError(new SamzaException("Invalid resource request."));
    }

    log.info("Finished starting YarnClusterResourceManager");
  }

  /**
   * Request resources for running container processes.
   */
  @Override
  public void requestResources(SamzaResourceRequest resourceRequest) {
    String processorId = resourceRequest.getProcessorId();
    String requestId = resourceRequest.getRequestId();
    String preferredHost = resourceRequest.getPreferredHost();
    String[] racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
    int memoryMb = resourceRequest.getMemoryMB();
    int cpuCores = resourceRequest.getNumCores();
    Resource capability = Resource.newInstance(memoryMb, cpuCores);
    String nodeLabelsExpression = yarnConfig.getContainerLabel();

    AMRMClient.ContainerRequest issuedRequest;

    /*
     * Yarn enforces these two checks:
     *   1. ANY_HOST requests should always be made with relax-locality = true
     *   2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true
     *
     * Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests
     * should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than
     * any-host requests since data-locality is critical.
     */
    if (preferredHost.equals("ANY_HOST")) {
      Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
      boolean relaxLocality = true;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
    } else {
      String[] nodes = {preferredHost};
      Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
      boolean relaxLocality = false;
      log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
          processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression);
    }
    // ensure that updating the state and making the request are done atomically.
    synchronized (lock) {
      requestsMap.put(resourceRequest, issuedRequest);
      amClient.addContainerRequest(issuedRequest);
    }
  }

  /**
   * Requests the YarnContainerManager to release a resource. If the app cannot use the resource or wants to give up
   * the resource, it can release them.
   *
   * @param resource to be released
   */
  @Override
  public void releaseResources(SamzaResource resource) {
    log.info("Releasing Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
    // ensure that updating state and removing the request are done atomically
    synchronized (lock) {
      Container container = allocatedResources.get(resource);
      if (container == null) {
        log.info("Container ID: {} on host: {} was already released.", resource.getContainerId(), resource.getHost());
        return;
      }
      amClient.releaseAssignedContainer(container.getId());
      allocatedResources.remove(resource);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



