public RssShuffleManager()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [220:401]


  public RssShuffleManager(
      InputContext inputContext,
      Configuration conf,
      int numInputs,
      int bufferSize,
      boolean ifileReadAheadEnabled,
      int ifileReadAheadLength,
      CompressionCodec codec,
      FetchedInputAllocator inputAllocator,
      int shuffleId,
      ApplicationAttemptId applicationAttemptId)
      throws IOException {
    super(
        inputContext,
        conf,
        numInputs,
        bufferSize,
        ifileReadAheadEnabled,
        ifileReadAheadLength,
        codec,
        inputAllocator);
    this.inputContext = inputContext;
    this.conf = conf;
    this.numInputs = numInputs;
    this.shuffleId = shuffleId;
    this.applicationAttemptId = applicationAttemptId;

    this.shuffledInputsCounter =
        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
    this.failedShufflesCounter =
        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
    this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
    this.decompressedDataSizeCounter =
        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
    this.bytesShuffledToDiskCounter =
        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
    this.bytesShuffledToMemCounter =
        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
    this.bytesShuffledDirectDiskCounter =
        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);

    this.ifileBufferSize = bufferSize;
    this.ifileReadAhead = ifileReadAheadEnabled;
    this.ifileReadAheadLength = ifileReadAheadLength;
    this.codec = codec;
    this.inputManager = inputAllocator;
    this.localDiskFetchEnabled =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
            TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
    this.sharedFetchEnabled =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
            TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
    this.verifyDiskChecksum =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
    this.maxTimeToWaitForReportMillis = 1;

    this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
    this.firstEventReceived =
        inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
    this.lastEventReceived =
        inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);

    this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());

    completedInputSet = new BitSet(numInputs);
    /**
     * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt. We do
     * not know upfront the number of spills from source.
     */
    completedInputs = new LinkedBlockingDeque<>();
    knownSrcHosts = new ConcurrentHashMap<>();
    pendingHosts = new LinkedBlockingQueue<>();
    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<>());
    rssRunningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<>());

    int maxConfiguredFetchers =
        conf.getInt(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);

    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);

    final ExecutorService fetcherRawExecutor;
    if (conf.getBoolean(
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
      fetcherRawExecutor =
          inputContext.createTezFrameworkExecutorService(
              numFetchers, "Fetcher_B {" + srcNameTrimmed + "} #%d");
    } else {
      fetcherRawExecutor =
          Executors.newFixedThreadPool(
              numFetchers,
              new ThreadFactoryBuilder()
                  .setDaemon(true)
                  .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d")
                  .build());
    }
    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);

    ExecutorService schedulerRawExecutor =
        Executors.newFixedThreadPool(
            1,
            new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}")
                .build());
    this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
    this.rssSchedulerCallable = new RssRunShuffleCallable(conf);

    this.startTime = System.currentTimeMillis();
    this.lastProgressTime = startTime;

    this.asyncHttp =
        conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
    httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);

    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();

    this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);

    this.localDisks =
        Iterables.toArray(localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
    this.localhostName = inputContext.getExecutionContext().getHostName();

    String auxiliaryService =
        conf.get(
            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
    final ByteBuffer shuffleMetaData = inputContext.getServiceProviderMetaData(auxiliaryService);
    this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);

    /**
     * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would be
     * approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
     */
    this.maxTaskOutputAtOnce =
        Math.max(
            1,
            Math.min(
                75,
                conf.getInt(
                    TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
                    TezRuntimeConfiguration
                        .TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));

    if (null != this.localDisks) {
      Arrays.sort(this.localDisks);
    }

    shuffleInfoEventsMap = new ConcurrentHashMap<>();

    LOG.info(
        srcNameTrimmed
            + ": numInputs="
            + numInputs
            + ", compressionCodec="
            + (codec == null ? "NoCompressionCodec" : codec.getClass().getName())
            + ", numFetchers="
            + numFetchers
            + ", ifileBufferSize="
            + ifileBufferSize
            + ", ifileReadAheadEnabled="
            + ifileReadAhead
            + ", ifileReadAheadLength="
            + ifileReadAheadLength
            + ", "
            + "localDiskFetchEnabled="
            + localDiskFetchEnabled
            + ", "
            + "sharedFetchEnabled="
            + sharedFetchEnabled
            + ", "
            + httpConnectionParams.toString()
            + ", maxTaskOutputAtOnce="
            + maxTaskOutputAtOnce);
  }