public static void main()

in client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java [328:465]


  public static void main(String[] args) {
    try {

      boolean sessionModeCliOption = false;
      for (int i = 0; i < args.length; i++) {
        if (args[i].startsWith("-D")) {
          String[] property = args[i].split("=");
          if (property.length < 2) {
            System.setProperty(property[0].substring(2), "");
          } else {
            System.setProperty(property[0].substring(2), property[1]);
          }
        } else if (args[i].contains("--session") || args[i].contains("-s")) {
          sessionModeCliOption = true;
        }
      }

      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
      final String pid = System.getenv().get("JVM_PID");
      String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
      String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
      String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
      String nodeHttpPortString =
          System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
      String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
      String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
      if (clientVersion == null) {
        clientVersion = VersionInfo.UNKNOWN;
      }

      validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV);

      ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
      ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();

      long appSubmitTime = Long.parseLong(appSubmitTimeStr);

      String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());

      Logger.info(
          "Creating CelebornDAGAppMaster for "
              + "applicationId={}"
              + ", attemptNum={}"
              + ", AMContainerId={}"
              + ", jvmPid={}"
              + ", userFromEnv={}"
              + ", cliSessionOption={}"
              + ", pwd={}"
              + ", localDirs={}"
              + ", logDirs={}",
          applicationAttemptId.getApplicationId(),
          applicationAttemptId.getAttemptId(),
          containerId,
          pid,
          jobUserName,
          sessionModeCliOption,
          System.getenv(ApplicationConstants.Environment.PWD.name()),
          System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()),
          System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));

      Configuration conf = new Configuration(new YarnConfiguration());

      DAGProtos.ConfigurationProto confProto =
          TezUtilsInternal.readUserSpecifiedTezConfiguration(
              System.getenv(ApplicationConstants.Environment.PWD.name()));
      TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());

      DAGProtos.AMPluginDescriptorProto amPluginDescriptorProto = null;
      if (confProto.hasAmPluginDescriptor()) {
        amPluginDescriptorProto = confProto.getAmPluginDescriptor();
      }

      // disable tez slow start
      conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 1.0f);
      conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 1.0f);
      // disable transfer shuffle from event
      conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED, false);
      conf.setBoolean(
          TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE, false);
      // disable pipelined shuffle
      conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false);
      // disable reschedule task on unhealthy nodes because shuffle data are stored in Celeborn
      conf.setBoolean(TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, false);

      // support set celeborn master endpoints from env
      String masterEndpointsKey =
          CelebornTezUtils.TEZ_PREFIX + CelebornConf.MASTER_ENDPOINTS().key();
      String masterEndpointsVal = conf.get(masterEndpointsKey);
      if (masterEndpointsVal == null || masterEndpointsVal.isEmpty()) {
        Logger.info(
            "MRAppMaster sets {} via environment variable {}.",
            masterEndpointsKey,
            MASTER_ENDPOINTS_ENV);
        conf.set(masterEndpointsKey, CelebornTezUtils.ensureGetSysEnv(MASTER_ENDPOINTS_ENV));
      }

      UserGroupInformation.setConfiguration(conf);
      Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();

      TezUtilsInternal.setSecurityUtilConfigration(Logger, conf);

      CelebornDagAppMaster appMaster =
          new CelebornDagAppMaster(
              applicationAttemptId,
              containerId,
              nodeHostString,
              Integer.parseInt(nodePortString),
              Integer.parseInt(nodeHttpPortString),
              new SystemClock(),
              appSubmitTime,
              sessionModeCliOption,
              System.getenv(ApplicationConstants.Environment.PWD.name()),
              TezCommonUtils.getTrimmedStrings(
                  System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
              TezCommonUtils.getTrimmedStrings(
                  System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
              clientVersion,
              credentials,
              jobUserName,
              amPluginDescriptorProto);
      ShutdownHookManager.get()
          .addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);

      // log the system properties
      if (Logger.isInfoEnabled()) {
        String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
        if (systemPropsToLog != null) {
          Logger.info(systemPropsToLog);
        }
      }

      initAndStartAppMaster(appMaster, conf);

    } catch (Throwable t) {
      Logger.error("Error starting DAGAppMaster", t);
      System.exit(1);
    }
  }