public void init()

in client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java [59:178]


  public void init(Context context) throws IOException, ClassNotFoundException {
    JobConf mrJobConf = context.getJobConf();
    reporter = context.getReporter();
    keyClass = (Class<K>) mrJobConf.getMapOutputKeyClass();
    valClass = (Class<V>) mrJobConf.getMapOutputValueClass();
    int sortmb = mrJobConf.getInt(JobContext.IO_SORT_MB, 100);
    if ((sortmb & 0x7FF) != sortmb) {
      throw new IOException("Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
    }
    partitions = mrJobConf.getNumReduceTasks();
    MapTask mapTask = context.getMapTask();
    JobConf rssJobConf = new JobConf(RssMRConfig.RSS_CONF_FILE);
    double sortThreshold =
        RssMRUtils.getDouble(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
            RssMRConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
    if (sortThreshold <= 0 || Double.compare(sortThreshold, 1.0) > 0) {
      throw new IOException("Invalid  sort memory use threshold : " + sortThreshold);
    }

    int batch =
        RssMRUtils.getInt(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
            RssMRConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
    RawComparator<K> comparator = mrJobConf.getOutputKeyComparator();
    double memoryThreshold =
        RssMRUtils.getDouble(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD,
            RssMRConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
    ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId();
    String appId = applicationAttemptId.toString();
    long taskAttemptId =
        RssMRUtils.convertTaskAttemptIdToLong(
            mapTask.getTaskID(), applicationAttemptId.getAttemptId());
    double sendThreshold =
        RssMRUtils.getDouble(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_SEND_THRESHOLD,
            RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);

    long sendCheckInterval =
        RssMRUtils.getLong(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
            RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE);
    long sendCheckTimeout =
        RssMRUtils.getLong(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
            RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
    int bitmapSplitNum =
        RssMRUtils.getInt(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_BITMAP_NUM,
            RssMRConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
    int numMaps = mrJobConf.getNumMapTasks();
    String storageType = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_STORAGE_TYPE);
    if (StringUtils.isEmpty(storageType)) {
      throw new RssException("storage type mustn't be empty");
    }

    Map<Integer, List<ShuffleServerInfo>> partitionToServers = createAssignmentMap(rssJobConf);

    SerializationFactory serializationFactory = new SerializationFactory(mrJobConf);
    long maxSegmentSize =
        RssMRUtils.getLong(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE,
            RssMRConfig.RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE);
    int sendThreadNum =
        RssMRUtils.getInt(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM,
            RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
    long maxBufferSize =
        RssMRUtils.getLong(
            rssJobConf,
            mrJobConf,
            RssMRConfig.RSS_WRITER_BUFFER_SIZE,
            RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
    shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
    bufferManager =
        new SortWriteBufferManager(
            (long) (ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
            taskAttemptId,
            batch,
            serializationFactory.getSerializer(keyClass),
            serializationFactory.getSerializer(valClass),
            comparator,
            memoryThreshold,
            appId,
            shuffleClient,
            sendCheckInterval,
            sendCheckTimeout,
            partitionToServers,
            successBlockIds,
            failedBlockIds,
            reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES),
            reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS),
            bitmapSplitNum,
            maxSegmentSize,
            numMaps,
            isMemoryShuffleEnabled(storageType),
            sendThreadNum,
            sendThreshold,
            maxBufferSize,
            RssMRConfig.toRssConf(rssJobConf));
  }