in client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java [147:219]
public long pushData() throws IOException {
// pushData should be synchronized between pushers
synchronized (sharedPushLock) {
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
inMemSorter.getSortedIterator();
byte[] dataBuf = new byte[pushBufferMaxSize];
int offSet = 0;
int currentPartition = -1;
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final int partition =
shuffledPartitions != null
? inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
: sortedRecords.packedRecordPointer.getPartitionId();
if (partition != currentPartition) {
if (currentPartition == -1) {
currentPartition = partition;
} else {
int bytesWritten =
shuffleClient.mergeData(
shuffleId,
mapId,
attemptNumber,
currentPartition,
dataBuf,
0,
offSet,
numMappers,
numPartitions);
mapStatusLengths[currentPartition].add(bytesWritten);
afterPush.accept(bytesWritten);
currentPartition = partition;
offSet = 0;
}
}
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final Object recordPage = taskMemoryManager.getPage(recordPointer);
final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
int recordSize = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
if (offSet + recordSize > dataBuf.length) {
try {
dataPusher.addTask(partition, dataBuf, offSet);
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
}
offSet = 0;
}
long recordReadPosition = recordOffsetInPage + UAO_SIZE;
Platform.copyMemory(
recordPage,
recordReadPosition,
dataBuf,
Platform.BYTE_ARRAY_OFFSET + offSet,
recordSize);
offSet += recordSize;
}
if (offSet > 0) {
try {
dataPusher.addTask(currentPartition, dataBuf, offSet);
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
}
}
long freedBytes = freeMemory();
inMemSorter.freeMemory();
return freedBytes;
}
}