public ShuffleScheduler()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java [256:448]


  public ShuffleScheduler(InputContext inputContext,
                          Configuration conf,
                          int numberOfInputs,
                          ExceptionReporter exceptionReporter,
                          MergeManager mergeManager,
                          FetchedInputAllocatorOrderedGrouped allocator,
                          long startTime,
                          CompressionCodec codec,
                          boolean ifileReadAhead,
                          int ifileReadAheadLength,
                          String srcNameTrimmed) throws IOException {
    this.inputContext = inputContext;
    this.conf = conf;
    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
    this.exceptionReporter = exceptionReporter;
    this.allocator = allocator;
    this.mergeManager = mergeManager;
    this.numInputs = numberOfInputs;
    int abortFailureLimitConf = conf.getInt(TezRuntimeConfiguration
        .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, TezRuntimeConfiguration
        .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT);
    if (abortFailureLimitConf <= -1) {
      abortFailureLimit = Math.max(15, numberOfInputs / 10);
    } else {
      //No upper cap, as user is setting this intentionally
      abortFailureLimit = abortFailureLimitConf;
    }
    remainingMaps = new AtomicInteger(numberOfInputs);
    finishedMaps = new BitSet(numberOfInputs);
    this.ifileReadAhead = ifileReadAhead;
    this.ifileReadAheadLength = ifileReadAheadLength;
    this.srcNameTrimmed = srcNameTrimmed;
    this.codec = codec;
    int configuredNumFetchers =
        conf.getInt(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
    numFetchers = Math.min(configuredNumFetchers, numInputs);

    localDiskFetchEnabled = conf.getBoolean(
        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);

    this.minFailurePerHost = conf.getInt(
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT);
    Preconditions.checkArgument(minFailurePerHost >= 0,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST
            + "=" + minFailurePerHost + " should not be negative");

    this.hostFailureFraction = conf.getFloat(TezRuntimeConfiguration
        .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION,
        TezRuntimeConfiguration
            .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT);

    this.maxStallTimeFraction = conf.getFloat(
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT);
    Preconditions.checkArgument(maxStallTimeFraction >= 0,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION
            + "=" + maxStallTimeFraction + " should not be negative");

    this.minReqProgressFraction = conf.getFloat(
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION,
        TezRuntimeConfiguration
            .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT);
    Preconditions.checkArgument(minReqProgressFraction >= 0,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION
            + "=" + minReqProgressFraction + " should not be negative");

    this.maxAllowedFailedFetchFraction = conf.getFloat(
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT);
    Preconditions.checkArgument(maxAllowedFailedFetchFraction >= 0,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION
            + "=" + maxAllowedFailedFetchFraction + " should not be negative");

    this.checkFailedFetchSinceLastCompletion = conf.getBoolean
        (TezRuntimeConfiguration
            .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION,
            TezRuntimeConfiguration
                .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);

    this.localHostname = inputContext.getExecutionContext().getHostName();
    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
    final ByteBuffer shuffleMetadata =
        inputContext.getServiceProviderMetaData(auxiliaryService);
    this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);

    this.referee = new Referee();
    // Counters used by the ShuffleScheduler
    this.shuffledInputsCounter = inputContext.getCounters().findCounter(
        TaskCounter.NUM_SHUFFLED_INPUTS);
    this.reduceShuffleBytes = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
    this.reduceBytesDecompressed = inputContext.getCounters().findCounter(
        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
    this.failedShuffleCounter = inputContext.getCounters().findCounter(
        TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
    this.bytesShuffledToDisk = inputContext.getCounters().findCounter(
        TaskCounter.SHUFFLE_BYTES_TO_DISK);
    this.bytesShuffledToDiskDirect =  inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
    this.bytesShuffledToMem = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);

    // Counters used by Fetchers
    ioErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
        ShuffleErrors.IO_ERROR.toString());
    wrongLengthErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
        ShuffleErrors.WRONG_LENGTH.toString());
    badIdErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
        ShuffleErrors.BAD_ID.toString());
    wrongMapErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
        ShuffleErrors.WRONG_MAP.toString());
    connectionErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
        ShuffleErrors.CONNECTION.toString());
    wrongReduceErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
        ShuffleErrors.WRONG_REDUCE.toString());

    this.startTime = startTime;
    this.lastProgressTime = startTime;

    this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
    this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
    this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
    SecretKey jobTokenSecret = ShuffleUtils
        .getJobTokenSecretFromTokenBytes(inputContext
            .getServiceConsumerMetaData(auxiliaryService));
    this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret, conf);

    final ExecutorService fetcherRawExecutor;
    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
      fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers,
          "Fetcher_O {" + srcNameTrimmed + "} #%d");
    } else {
      fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder()
          .setDaemon(true).setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
    }
    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);

    this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
    referee.start();
    this.maxFetchFailuresBeforeReporting =
        conf.getInt(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT);
    this.reportReadErrorImmediately =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
    this.verifyDiskChecksum = conf.getBoolean(
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);

    /**
     * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
     * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
     */
    this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));

    this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
    this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
    this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
    this.maxPenaltyTime = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS,
        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT);

    this.enableFetcherTestingErrors =
        conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT);

    pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap();
    LOG.info("ShuffleScheduler running for sourceVertex: "
        + inputContext.getSourceVertexName() + " with configuration: "
        + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
        + ", reportReadErrorImmediately=" + reportReadErrorImmediately
        + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
        + ", abortFailureLimit=" + abortFailureLimit
        + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce
        + ", numFetchers=" + numFetchers
        + ", hostFailureFraction=" + hostFailureFraction
        + ", minFailurePerHost=" + minFailurePerHost
        + ", maxAllowedFailedFetchFraction=" + maxAllowedFailedFetchFraction
        + ", maxStallTimeFraction=" + maxStallTimeFraction
        + ", minReqProgressFraction=" + minReqProgressFraction
        + ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion
        + ", asyncHttp=" + asyncHttp
        + ", enableFetcherTestingErrors=" + enableFetcherTestingErrors
    );
  }