public SortBasedShuffleWriter()

in client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java [96:170]


  public SortBasedShuffleWriter(
      ShuffleDependency<K, V, C> dep,
      int numMappers,
      TaskContext taskContext,
      CelebornConf conf,
      ShuffleClient client,
      ShuffleWriteMetricsReporter metrics,
      ExecutorService executorService,
      SendBufferPool sendBufferPool)
      throws IOException {
    this.mapId = taskContext.partitionId();
    this.dep = dep;
    this.shuffleId = dep.shuffleId();
    SerializerInstance serializer = dep.serializer().newInstance();
    this.partitioner = dep.partitioner();
    this.writeMetrics = metrics;
    this.taskContext = taskContext;
    this.numMappers = numMappers;
    this.numPartitions = dep.partitioner().numPartitions();
    this.shuffleClient = client;
    unsafeRowFastWrite = conf.clientPushUnsafeRowFastWrite();

    serBuffer = new OpenByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
    serOutputStream = serializer.serializeStream(serBuffer);

    this.mapStatusLengths = new LongAdder[numPartitions];
    for (int i = 0; i < numPartitions; i++) {
      this.mapStatusLengths[i] = new LongAdder();
    }
    tmpRecords = new long[numPartitions];

    pushBufferMaxSize = conf.clientPushBufferMaxSize();
    pipelined = conf.clientPushSortPipelineEnabled();

    if (pipelined) {
      for (int i = 0; i < pushers.length; i++) {
        pushers[i] =
            new SortBasedPusher(
                taskContext.taskMemoryManager(),
                shuffleClient,
                shuffleId,
                mapId,
                taskContext.attemptNumber(),
                taskContext.taskAttemptId(),
                numMappers,
                numPartitions,
                conf,
                writeMetrics::incBytesWritten,
                mapStatusLengths,
                conf.clientPushSortMemoryThreshold() / 2,
                sharedPushLock,
                executorService,
                sendBufferPool);
      }
      currentPusher = pushers[0];
    } else {
      currentPusher =
          new SortBasedPusher(
              taskContext.taskMemoryManager(),
              shuffleClient,
              shuffleId,
              mapId,
              taskContext.attemptNumber(),
              taskContext.taskAttemptId(),
              numMappers,
              numPartitions,
              conf,
              writeMetrics::incBytesWritten,
              mapStatusLengths,
              conf.clientPushSortMemoryThreshold(),
              sharedPushLock,
              null,
              sendBufferPool);
    }
  }