samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [150:203]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public YarnClusterResourceManager(Config config, JobModelManager jobModelManager,
      ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState) {
    super(callback);
    yarnConfiguration = new YarnConfiguration();
    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());

    // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration
    FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
    fsImplConfig.getSchemes().forEach(
      scheme -> {
        fsImplConfig.getSchemeConfig(scheme).forEach(
          (confKey, confValue) -> yarnConfiguration.set(confKey, confValue)
        );
      }
    );

    MetricsRegistryMap registry = new MetricsRegistryMap();
    metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);

    // parse configs from the Yarn environment
    String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
    String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
    String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
    String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());

    int nodePort = Integer.parseInt(nodePortString);
    int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
    YarnConfig yarnConfig = new YarnConfig(config);
    this.yarnConfig = yarnConfig;
    this.config = config;
    int interval = yarnConfig.getAMPollIntervalMs();

    //Instantiate the AM Client.
    this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);

    this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);

    log.info("Initialized YarnAppState: {}", state.toString());
    this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, yarnConfiguration);

    log.info("Container ID: {}, Nodehost:  {} , Nodeport : {} , NodeHttpport: {}", containerIdStr, nodeHostString, nodePort, nodeHttpPort);
    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
    this.lifecycle = new SamzaYarnAppMasterLifecycle(
        clusterManagerConfig.getContainerMemoryMb(),
        clusterManagerConfig.getNumCores(),
        samzaAppState,
        state,
        amClient,
        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()
    );
    this.nmClientAsync = NMClientAsync.createNMClientAsync(this);

  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java [150:203]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public YarnClusterResourceManager(Config config, JobModelManager jobModelManager,
      ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState) {
    super(callback);
    yarnConfiguration = new YarnConfiguration();
    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());

    // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration
    FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
    fsImplConfig.getSchemes().forEach(
      scheme -> {
        fsImplConfig.getSchemeConfig(scheme).forEach(
          (confKey, confValue) -> yarnConfiguration.set(confKey, confValue)
        );
      }
    );

    MetricsRegistryMap registry = new MetricsRegistryMap();
    metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);

    // parse configs from the Yarn environment
    String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
    String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
    String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
    String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());

    int nodePort = Integer.parseInt(nodePortString);
    int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
    YarnConfig yarnConfig = new YarnConfig(config);
    this.yarnConfig = yarnConfig;
    this.config = config;
    int interval = yarnConfig.getAMPollIntervalMs();

    //Instantiate the AM Client.
    this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);

    this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);

    log.info("Initialized YarnAppState: {}", state.toString());
    this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, yarnConfiguration);

    log.info("Container ID: {}, Nodehost:  {} , Nodeport : {} , NodeHttpport: {}", containerIdStr, nodeHostString, nodePort, nodeHttpPort);
    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
    this.lifecycle = new SamzaYarnAppMasterLifecycle(
        clusterManagerConfig.getContainerMemoryMb(),
        clusterManagerConfig.getNumCores(),
        samzaAppState,
        state,
        amClient,
        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()
    );
    this.nmClientAsync = NMClientAsync.createNMClientAsync(this);

  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



