samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [210:235]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void start() {
    if (!started.compareAndSet(false, true)) {
      log.info("Attempting to start an already started YarnClusterResourceManager");
      return;
    }
    metrics.start();
    service.onInit();
    log.info("Starting YarnClusterResourceManager.");
    amClient.init(yarnConfiguration);
    amClient.start();
    nmClientAsync.init(yarnConfiguration);
    nmClientAsync.start();
    Set<ContainerId> previousAttemptsContainers = lifecycle.onInit();
    metrics.setContainersFromPreviousAttempts(previousAttemptsContainers.size());

    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
      log.info("Received running containers from previous attempt. Invoking launch success for them.");
      previousAttemptsContainers.forEach(this::handleOnContainerStarted);
    }

    if (lifecycle.shouldShutdown()) {
      clusterManagerCallback.onError(new SamzaException("Invalid resource request."));
    }

    log.info("Finished starting YarnClusterResourceManager");
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [210:235]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void start() {
    if (!started.compareAndSet(false, true)) {
      log.info("Attempting to start an already started YarnClusterResourceManager");
      return;
    }
    metrics.start();
    service.onInit();
    log.info("Starting YarnClusterResourceManager.");
    amClient.init(yarnConfiguration);
    amClient.start();
    nmClientAsync.init(yarnConfiguration);
    nmClientAsync.start();
    Set<ContainerId> previousAttemptsContainers = lifecycle.onInit();
    metrics.setContainersFromPreviousAttempts(previousAttemptsContainers.size());

    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
      log.info("Received running containers from previous attempt. Invoking launch success for them.");
      previousAttemptsContainers.forEach(this::handleOnContainerStarted);
    }

    if (lifecycle.shouldShutdown()) {
      clusterManagerCallback.onError(new SamzaException("Invalid resource request."));
    }

    log.info("Finished starting YarnClusterResourceManager");
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



