public Shuffle()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java [112:221]


  public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs,
      long initialMemoryAvailable) throws IOException {
    this.inputContext = inputContext;
    this.conf = conf;
    this.httpConnectionParams =
        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
    this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
    
    this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName());
    
    this.jobTokenSecret = ShuffleUtils
        .getJobTokenSecretFromTokenBytes(inputContext
            .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
    
    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
      Class<? extends CompressionCodec> codecClass =
          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
      codec = ReflectionUtils.newInstance(codecClass, conf);
    } else {
      codec = null;
    }
    this.ifileReadAhead = conf.getBoolean(
        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
    if (this.ifileReadAhead) {
      this.ifileReadAheadLength = conf.getInt(
          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
    } else {
      this.ifileReadAheadLength = 0;
    }
    
    Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
    
    FileSystem localFS = FileSystem.getLocal(this.conf);
    LocalDirAllocator localDirAllocator = 
        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);

    // TODO TEZ Get rid of Map / Reduce references.
    TezCounter shuffledInputsCounter = 
        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
    TezCounter reduceShuffleBytes =
        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
    TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
    TezCounter failedShuffleCounter =
        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
    TezCounter spilledRecordsCounter = 
        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
    TezCounter reduceCombineInputCounter =
        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
    TezCounter mergedMapOutputsCounter =
        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
    TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
        TaskCounter.SHUFFLE_BYTES_TO_DISK);
    TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
        TaskCounter.SHUFFLE_BYTES_TO_MEM);
    
    LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
        + (codec == null ? "None" : codec.getClass().getName()) + 
        "ifileReadAhead: " + ifileReadAhead);

    boolean sslShuffle = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
      TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
    scheduler = new ShuffleScheduler(
          this.inputContext,
          this.conf,
          numInputs,
          this,
          shuffledInputsCounter,
          reduceShuffleBytes,
          reduceDataSizeDecompressed,
          failedShuffleCounter,
          bytesShuffedToDisk,
          bytesShuffedToMem);
    eventHandler= new ShuffleInputEventHandler(
      inputContext,
      scheduler,
      sslShuffle);
    merger = new MergeManager(
          this.conf,
          localFS,
          localDirAllocator,
          inputContext,
          combiner,
          spilledRecordsCounter,
          reduceCombineInputCounter,
          mergedMapOutputsCounter,
          this,
          initialMemoryAvailable,
          codec,
          ifileReadAhead,
          ifileReadAheadLength);
    
    ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());

    int configuredNumFetchers = 
        conf.getInt(
            TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
            TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
    numFetchers = Math.min(configuredNumFetchers, numInputs);
    LOG.info("Num fetchers being started: " + numFetchers);
    fetchers = Lists.newArrayListWithCapacity(numFetchers);
    
    executor = MoreExecutors.listeningDecorator(rawExecutor);
    runShuffleCallable = new RunShuffleCallable();
  }