public RssSimpleFetchedInputAllocator()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java [81:181]


  public RssSimpleFetchedInputAllocator(
      String srcNameTrimmed,
      String uniqueIdentifier,
      int dagID,
      Configuration conf,
      long maxTaskAvailableMemory,
      long memoryAvailable,
      String appAttemptId) {
    super(srcNameTrimmed, uniqueIdentifier, dagID, conf, maxTaskAvailableMemory, memoryAvailable);
    this.srcNameTrimmed = srcNameTrimmed;
    this.conf = conf;
    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
    this.initialMemoryAvailable = memoryAvailable;
    this.uniqueIdentifier = uniqueIdentifier;
    this.appAttemptId = appAttemptId;

    this.fileNameAllocator = new TezTaskOutputFiles(conf, uniqueIdentifier, dagID);
    this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);

    // Setup configuration
    final float maxInMemCopyUse =
        conf.getFloat(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
      throw new RssException(
          "Invalid value for "
              + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT
              + ": "
              + maxInMemCopyUse);
    }

    long memReq =
        (long)
            (conf.getLong(
                    Constants.TEZ_RUNTIME_TASK_MEMORY,
                    Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE))
                * maxInMemCopyUse);

    if (memReq <= this.initialMemoryAvailable) {
      this.memoryLimit = memReq;
    } else {
      this.memoryLimit = initialMemoryAvailable;
    }

    final float singleShuffleMemoryLimitPercent =
        conf.getFloat(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
    if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
      throw new RssException(
          "Invalid value for "
              + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT
              + ": "
              + singleShuffleMemoryLimitPercent);
    }
    this.maxSingleShuffleLimit =
        (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
    this.remoteSpillEnable = conf.getBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, false);
    if (this.remoteSpillEnable) {
      this.remoteSpillBasePath = conf.get(RSS_REMOTE_SPILL_STORAGE_PATH);
      if (StringUtils.isBlank(this.remoteSpillBasePath)) {
        throw new RssException("You must set remote spill path!");
      }
      // construct remote configuration
      String remoteStorageConf = this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
      Map<String, String> remoteStorageConfMap =
          RemoteStorageInfo.parseRemoteStorageConf(remoteStorageConf);
      Configuration remoteConf = new Configuration(this.conf);
      for (Map.Entry<String, String> entry : remoteStorageConfMap.entrySet()) {
        remoteConf.set(entry.getKey(), entry.getValue());
      }
      // construct remote filesystem
      int replication =
          this.conf.getInt(
              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
      int retries =
          this.conf.getInt(
              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
      try {
        remoteConf.setInt("dfs.replication", replication);
        remoteConf.setInt("dfs.client.block.write.retries", retries);
        remoteFS =
            HadoopFilesystemProvider.getFilesystem(new Path(this.remoteSpillBasePath), remoteConf);
      } catch (Exception e) {
        throw new RssException("Cannot init remoteFS on path:" + this.remoteSpillBasePath);
      }
    }

    LOG.info(
        srcNameTrimmed
            + ": "
            + "RequestedMemory="
            + memReq
            + ", AssignedMemory="
            + this.memoryLimit
            + ", maxSingleShuffleLimit="
            + this.maxSingleShuffleLimit);
  }