in client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java [96:170]
public SortBasedShuffleWriter(
ShuffleDependency<K, V, C> dep,
int numMappers,
TaskContext taskContext,
CelebornConf conf,
ShuffleClient client,
ExecutorService executorService,
SendBufferPool sendBufferPool)
throws IOException {
this.mapId = taskContext.partitionId();
this.dep = dep;
this.shuffleId = dep.shuffleId();
SerializerInstance serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.taskContext = taskContext;
this.numMappers = numMappers;
this.numPartitions = dep.partitioner().numPartitions();
this.shuffleClient = client;
unsafeRowFastWrite = conf.clientPushUnsafeRowFastWrite();
serBuffer = new OpenByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
serOutputStream = serializer.serializeStream(serBuffer);
this.mapStatusLengths = new LongAdder[numPartitions];
for (int i = 0; i < numPartitions; i++) {
mapStatusLengths[i] = new LongAdder();
}
this.mapStatusRecords = new long[numPartitions];
tmpRecords = new long[numPartitions];
pushBufferMaxSize = conf.clientPushBufferMaxSize();
pipelined = conf.clientPushSortPipelineEnabled();
if (pipelined) {
for (int i = 0; i < pushers.length; i++) {
pushers[i] =
new SortBasedPusher(
taskContext.taskMemoryManager(),
shuffleClient,
shuffleId,
mapId,
taskContext.attemptNumber(),
taskContext.taskAttemptId(),
numMappers,
numPartitions,
conf,
writeMetrics::incBytesWritten,
mapStatusLengths,
conf.clientPushSortMemoryThreshold() / 2,
globalPushLock,
executorService,
sendBufferPool);
}
currentPusher = pushers[0];
} else {
currentPusher =
new SortBasedPusher(
taskContext.taskMemoryManager(),
shuffleClient,
shuffleId,
mapId,
taskContext.attemptNumber(),
taskContext.taskAttemptId(),
numMappers,
numPartitions,
conf,
writeMetrics::incBytesWritten,
mapStatusLengths,
conf.clientPushSortMemoryThreshold(),
globalPushLock,
null,
sendBufferPool);
}
}