public static void main()

in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java [135:426]


  public static void main(String[] args) {

    JobConf conf = new JobConf(new YarnConfiguration());
    conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));

    ShuffleWriteClient shuffleWriteClient = null;
    int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
    if (numReduceTasks > 0) {
      String coordinators = conf.get(RssMRConfig.RSS_COORDINATOR_QUORUM);

      ShuffleWriteClient client = RssMRUtils.createShuffleClient(conf);
      shuffleWriteClient = client;

      LOG.info("Registering coordinators {}", coordinators);
      client.registerCoordinators(coordinators);

      final ScheduledExecutorService scheduledExecutorService =
          Executors.newSingleThreadScheduledExecutor(
              new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                  Thread t = Executors.defaultThreadFactory().newThread(r);
                  t.setDaemon(true);
                  return t;
                }
              });

      // set loadDefaults to false, rss_conf.xml should only contain conf of RSS,
      // Hadoop conf is not necessary.
      Configuration extraConf = new JobConf(false);
      extraConf.clear();

      RssMRUtils.applyClientConf(extraConf, conf);

      // get remote storage from coordinator if necessary
      boolean dynamicConfEnabled =
          conf.getBoolean(
              RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
              RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);

      // fetch client conf and apply them if necessary
      if (dynamicConfEnabled) {
        Map<String, String> clusterClientConf =
            client.fetchClientConf(
                conf.getInt(
                    RssMRConfig.RSS_ACCESS_TIMEOUT_MS,
                    RssMRConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
        RssMRUtils.applyDynamicClientConf(extraConf, clusterClientConf);
      }

      // Get the configured server assignment tags and it will also add default shuffle version tag.
      Set<String> assignmentTags = new HashSet<>();
      String rawTags = conf.get(RssMRConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
      if (StringUtils.isNotEmpty(rawTags)) {
        rawTags = rawTags.trim();
        assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
      }
      assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
      String clientType =
          extraConf.get(RssMRConfig.RSS_CLIENT_TYPE, RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
      ClientUtils.validateClientType(clientType);
      assignmentTags.add(clientType);

      String storageType = RssMRUtils.getString(extraConf, RssMRConfig.RSS_STORAGE_TYPE);
      boolean testMode = RssMRUtils.getBoolean(extraConf, RssMRConfig.RSS_TEST_MODE_ENABLE, false);
      ClientUtils.validateTestModeConf(testMode, storageType);
      ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId();
      String appId = applicationAttemptId.toString();
      RemoteStorageInfo defaultRemoteStorage =
          new RemoteStorageInfo(extraConf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, ""));
      RemoteStorageInfo remoteStorage =
          ClientUtils.fetchRemoteStorage(
              appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client);
      // set the remote storage with actual value
      extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
      extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
      RssMRUtils.validateRssClientConf(extraConf);
      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
      if (conf.getBoolean(
          RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
          RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {

        if (remoteStorage.isEmpty()) {
          throw new IllegalArgumentException(
              "Remote spill only supports "
                  + StorageType.MEMORY_LOCALFILE_HDFS.name()
                  + " mode with "
                  + remoteStorage);
        }

        // When remote spill is enabled, reduce task is more easy to crash.
        // We allow more attempts to avoid recomputing job.
        int originalAttempts = conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
        int inc =
            conf.getInt(
                RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC,
                RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT);
        if (inc < 0) {
          throw new IllegalArgumentException(
              RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC + " cannot be negative");
        }
        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc);
      }

      int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
      // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the
      // same result
      long retryInterval =
          conf.getLong(
              RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
              RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
      int retryTimes =
          conf.getInt(
              RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
              RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
      boolean remoteMergeEnable =
          conf.getBoolean(RSS_REMOTE_MERGE_ENABLE, RSS_REMOTE_MERGE_ENABLE_DEFAULT);
      ShuffleAssignmentsInfo response;
      try {
        response =
            RetryUtils.retry(
                () -> {
                  ShuffleAssignmentsInfo shuffleAssignments =
                      client.getShuffleAssignments(
                          appId,
                          0,
                          numReduceTasks,
                          1,
                          Sets.newHashSet(assignmentTags),
                          requiredAssignmentShuffleServersNum,
                          -1);

                  Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
                      shuffleAssignments.getServerToPartitionRanges();

                  if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
                    return null;
                  }
                  LOG.info("Start to register shuffle");
                  long start = System.currentTimeMillis();
                  serverToPartitionRanges
                      .entrySet()
                      .forEach(
                          entry ->
                              client.registerShuffle(
                                  entry.getKey(),
                                  appId,
                                  0,
                                  entry.getValue(),
                                  remoteStorage,
                                  ShuffleDataDistributionType.NORMAL,
                                  RssMRConfig.toRssConf(conf)
                                      .get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
                                  0,
                                  remoteMergeEnable
                                      ? MergeContext.newBuilder()
                                          .setKeyClass(conf.getMapOutputKeyClass().getName())
                                          .setValueClass(conf.getMapOutputValueClass().getName())
                                          .setComparatorClass(
                                              conf.getOutputKeyComparator().getClass().getName())
                                          .setMergedBlockSize(
                                              conf.getInt(
                                                  RssMRConfig.RSS_MERGED_BLOCK_SZIE,
                                                  RssMRConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT))
                                          .setMergeClassLoader(
                                              conf.get(RSS_REMOTE_MERGE_CLASS_LOADER, ""))
                                          .build()
                                      : null));
                  LOG.info(
                      "Finish register shuffle with "
                          + (System.currentTimeMillis() - start)
                          + " ms");
                  return shuffleAssignments;
                },
                retryInterval,
                retryTimes);
      } catch (Throwable throwable) {
        throw new RssException("registerShuffle failed!", throwable);
      }

      if (response == null) {
        return;
      }
      long heartbeatInterval =
          conf.getLong(
              RssMRConfig.RSS_HEARTBEAT_INTERVAL, RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
      long heartbeatTimeout =
          conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2);
      client.registerApplicationInfo(appId, heartbeatTimeout, "user");
      scheduledExecutorService.scheduleAtFixedRate(
          () -> {
            try {
              client.sendAppHeartbeat(appId, heartbeatTimeout);
              LOG.info("Finish send heartbeat to coordinator and servers");
            } catch (Exception e) {
              LOG.warn("Fail to send heartbeat to coordinator and servers", e);
            }
          },
          heartbeatInterval / 2,
          heartbeatInterval,
          TimeUnit.MILLISECONDS);

      // write shuffle worker assignments to submit work directory
      // format is as below:
      // mapreduce.rss.assignment.partition.1:server1,server2
      // mapreduce.rss.assignment.partition.2:server3,server4
      // ...
      response
          .getPartitionToServers()
          .entrySet()
          .forEach(
              entry -> {
                List<String> servers = Lists.newArrayList();
                for (ShuffleServerInfo server : entry.getValue()) {
                  if (server.getNettyPort() > 0) {
                    servers.add(
                        server.getHost()
                            + ":"
                            + server.getGrpcPort()
                            + ":"
                            + server.getNettyPort());
                  } else {
                    servers.add(server.getHost() + ":" + server.getGrpcPort());
                  }
                }
                extraConf.set(
                    RssMRConfig.RSS_ASSIGNMENT_PREFIX + entry.getKey(),
                    StringUtils.join(servers, ","));
              });

      writeExtraConf(conf, extraConf);

      // close slow start
      conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
      LOG.warn("close slow start, because RSS does not support it yet");

      // MapReduce don't set setKeepContainersAcrossApplicationAttempts in AppContext, there will be
      // no container
      // to be shared between attempts. Rss don't support shared container between attempts.
      conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
      LOG.warn("close recovery enable, because RSS doesn't support it yet");

      String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
      if (jobDirStr == null) {
        throw new RssException("jobDir is empty");
      }
    }
    try {
      setMainStartedTrue();
      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
      String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
      validateInputParam(containerIdStr, ApplicationConstants.Environment.CONTAINER_ID.name());
      String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
      validateInputParam(nodeHostString, ApplicationConstants.Environment.NM_HOST.name());
      String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
      validateInputParam(nodePortString, ApplicationConstants.Environment.NM_PORT.name());
      String nodeHttpPortString =
          System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
      validateInputParam(nodeHttpPortString, ApplicationConstants.Environment.NM_HTTP_PORT.name());
      String appSubmitTimeStr = System.getenv("APP_SUBMIT_TIME_ENV");
      validateInputParam(appSubmitTimeStr, "APP_SUBMIT_TIME_ENV");
      ContainerId containerId = ContainerId.fromString(containerIdStr);
      ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
      if (applicationAttemptId != null) {
        CallerContext.setCurrent(
            (new CallerContext.Builder("mr_appmaster_" + applicationAttemptId.toString())).build());
      }

      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
      RssMRAppMaster appMaster =
          new RssMRAppMaster(
              applicationAttemptId,
              containerId,
              nodeHostString,
              Integer.parseInt(nodePortString),
              Integer.parseInt(nodeHttpPortString),
              appSubmitTime,
              shuffleWriteClient);
      ShutdownHookManager.get().addShutdownHook(new RssMRAppMasterShutdownHook(appMaster), 30);
      MRWebAppUtil.initialize(conf);
      String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
      if (systemPropsToLog != null) {
        LOG.info(systemPropsToLog);
      }
      String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
      conf.set("mapreduce.job.user.name", jobUserName);
      initAndStartAppMaster(appMaster, conf, jobUserName);
    } catch (Throwable t) {
      LOG.error("Error starting MRAppMaster", t);
      ExitUtil.terminate(1, t);
    }
  }