public RssShuffle()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffle.java [103:213]


  public RssShuffle(
      InputContext inputContext,
      Configuration conf,
      int numInputs,
      long initialMemoryAvailable,
      int shuffleId,
      ApplicationAttemptId applicationAttemptId)
      throws IOException {
    this.inputContext = inputContext;
    this.conf = conf;

    this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());

    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
      Class<? extends CompressionCodec> codecClass =
          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
      codec = ReflectionUtils.newInstance(codecClass, conf);
      // Work around needed for HADOOP-12191. Avoids the native initialization synchronization race
      codec.getDecompressorType();
    } else {
      codec = null;
    }
    this.ifileReadAhead =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
            TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
    if (this.ifileReadAhead) {
      this.ifileReadAheadLength =
          conf.getInt(
              TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
              TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
    } else {
      this.ifileReadAheadLength = 0;
    }

    Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);

    FileSystem localFS = FileSystem.getLocal(this.conf);
    LocalDirAllocator localDirAllocator =
        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);

    // TODO TEZ Get rid of Map / Reduce references.
    TezCounter spilledRecordsCounter =
        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
    TezCounter reduceCombineInputCounter =
        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
    TezCounter mergedMapOutputsCounter =
        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);

    LOG.info(
        srcNameTrimmed
            + ": "
            + "Shuffle assigned with "
            + numInputs
            + " inputs"
            + ", codec: "
            + (codec == null ? "None" : codec.getClass().getName())
            + ", ifileReadAhead: "
            + ifileReadAhead);

    startTime = System.currentTimeMillis();
    merger =
        new MergeManager(
            this.conf,
            localFS,
            localDirAllocator,
            inputContext,
            combiner,
            spilledRecordsCounter,
            reduceCombineInputCounter,
            mergedMapOutputsCounter,
            this,
            initialMemoryAvailable,
            codec,
            ifileReadAhead,
            ifileReadAheadLength);

    rssScheduler =
        new RssShuffleScheduler(
            this.inputContext,
            this.conf,
            numInputs,
            this,
            merger,
            merger,
            startTime,
            codec,
            ifileReadAhead,
            ifileReadAheadLength,
            srcNameTrimmed,
            shuffleId,
            applicationAttemptId);

    this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
    this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);

    eventHandler =
        new ShuffleInputEventHandlerOrderedGrouped(
            inputContext, rssScheduler, ShuffleUtils.isTezShuffleHandler(conf));

    ExecutorService rawExecutor =
        Executors.newFixedThreadPool(
            1,
            new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}")
                .build());

    executor = MoreExecutors.listeningDecorator(rawExecutor);
    rssRunShuffleCallable = new RssRunShuffleCallable();
  }