public RssSorter()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java [55:172]


  public RssSorter(
      TezTaskAttemptID tezTaskAttemptID,
      OutputContext outputContext,
      Configuration conf,
      int numMaps,
      int numOutputs,
      long initialMemoryAvailable,
      int shuffleId,
      ApplicationAttemptId applicationAttemptId,
      Map<Integer, List<ShuffleServerInfo>> partitionToServers)
      throws IOException {
    super(outputContext, conf, numOutputs, initialMemoryAvailable);
    this.partitionToServers = partitionToServers;

    this.numRecordsPerPartition = new int[numOutputs];

    long sortmb =
        conf.getLong(
            RssTezConfig.RSS_RUNTIME_IO_SORT_MB, RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB);
    LOG.info("conf.sortmb is {}", sortmb);
    sortmb = this.availableMemoryMb;
    LOG.info("sortmb, availableMemoryMb is {}, {}", sortmb, availableMemoryMb);
    if ((sortmb & 0x7FF) != sortmb) {
      throw new IOException("Invalid \"" + RssTezConfig.RSS_RUNTIME_IO_SORT_MB + "\": " + sortmb);
    }
    double sortThreshold =
        conf.getDouble(
            RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
            RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
    long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);

    long maxSegmentSize =
        conf.getLong(
            RssTezConfig.RSS_CLIENT_MAX_BUFFER_SIZE,
            RssTezConfig.RSS_CLIENT_DEFAULT_MAX_BUFFER_SIZE);
    long maxBufferSize =
        conf.getLong(
            RssTezConfig.RSS_WRITER_BUFFER_SIZE, RssTezConfig.RSS_DEFAULT_WRITER_BUFFER_SIZE);
    double memoryThreshold =
        conf.getDouble(
            RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
            RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
    int sendThreadNum =
        conf.getInt(
            RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
    double sendThreshold =
        conf.getDouble(
            RssTezConfig.RSS_CLIENT_SEND_THRESHOLD, RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
    int batch =
        conf.getInt(
            RssTezConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
            RssTezConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
    String storageType =
        conf.get(RssTezConfig.RSS_STORAGE_TYPE, RssTezConfig.RSS_DEFAULT_STORAGE_TYPE);
    if (StringUtils.isEmpty(storageType)) {
      throw new RssException("storage type mustn't be empty");
    }
    long sendCheckInterval =
        conf.getLong(
            RssTezConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
            RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_INTERVAL_MS);
    long sendCheckTimeout =
        conf.getLong(
            RssTezConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
            RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_TIMEOUT_MS);
    int bitmapSplitNum =
        conf.getInt(RssTezConfig.RSS_CLIENT_BITMAP_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);

    if (conf.get(RssTezConfig.HIVE_TEZ_LOG_LEVEL, RssTezConfig.DEFAULT_HIVE_TEZ_LOG_LEVEL)
        .equalsIgnoreCase(RssTezConfig.DEBUG_HIVE_TEZ_LOG_LEVEL)) {
      LOG.info("sortmb is {}", sortmb);
      LOG.info("sortThreshold is {}", sortThreshold);
      LOG.info("taskAttemptId is {}", taskAttemptId);
      LOG.info("maxSegmentSize is {}", maxSegmentSize);
      LOG.info("maxBufferSize is {}", maxBufferSize);
      LOG.info("memoryThreshold is {}", memoryThreshold);
      LOG.info("sendThreadNum is {}", sendThreadNum);
      LOG.info("sendThreshold is {}", sendThreshold);
      LOG.info("batch is {}", batch);
      LOG.info("storageType is {}", storageType);
      LOG.info("sendCheckInterval is {}", sendCheckInterval);
      LOG.info("sendCheckTimeout is {}", sendCheckTimeout);
      LOG.info("bitmapSplitNum is {}", bitmapSplitNum);
    }

    LOG.info("applicationAttemptId is {}", applicationAttemptId.toString());

    bufferManager =
        new WriteBufferManager(
            tezTaskAttemptID,
            (long) (ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
            applicationAttemptId.toString(),
            taskAttemptId,
            successBlockIds,
            failedBlockIds,
            RssTezUtils.createShuffleClient(conf),
            comparator,
            maxSegmentSize,
            keySerializer,
            valSerializer,
            maxBufferSize,
            memoryThreshold,
            sendThreadNum,
            sendThreshold,
            batch,
            new RssConf(),
            partitionToServers,
            numMaps,
            isMemoryShuffleEnabled(storageType),
            sendCheckInterval,
            sendCheckTimeout,
            bitmapSplitNum,
            shuffleId,
            true,
            mapOutputByteCounter,
            mapOutputRecordCounter);
    LOG.info("Initialized WriteBufferManager.");
  }