public long pushData()

in client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java [147:219]


  public long pushData() throws IOException {
    // pushData should be synchronized between pushers
    synchronized (sharedPushLock) {
      final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
          inMemSorter.getSortedIterator();

      byte[] dataBuf = new byte[pushBufferMaxSize];
      int offSet = 0;
      int currentPartition = -1;
      while (sortedRecords.hasNext()) {
        sortedRecords.loadNext();
        final int partition =
            shuffledPartitions != null
                ? inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
                : sortedRecords.packedRecordPointer.getPartitionId();
        if (partition != currentPartition) {
          if (currentPartition == -1) {
            currentPartition = partition;
          } else {
            int bytesWritten =
                shuffleClient.mergeData(
                    shuffleId,
                    mapId,
                    attemptNumber,
                    currentPartition,
                    dataBuf,
                    0,
                    offSet,
                    numMappers,
                    numPartitions);
            mapStatusLengths[currentPartition].add(bytesWritten);
            afterPush.accept(bytesWritten);
            currentPartition = partition;
            offSet = 0;
          }
        }
        final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
        final Object recordPage = taskMemoryManager.getPage(recordPointer);
        final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
        int recordSize = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);

        if (offSet + recordSize > dataBuf.length) {
          try {
            dataPusher.addTask(partition, dataBuf, offSet);
          } catch (InterruptedException e) {
            TaskInterruptedHelper.throwTaskKillException();
          }
          offSet = 0;
        }

        long recordReadPosition = recordOffsetInPage + UAO_SIZE;
        Platform.copyMemory(
            recordPage,
            recordReadPosition,
            dataBuf,
            Platform.BYTE_ARRAY_OFFSET + offSet,
            recordSize);
        offSet += recordSize;
      }
      if (offSet > 0) {
        try {
          dataPusher.addTask(currentPartition, dataBuf, offSet);
        } catch (InterruptedException e) {
          TaskInterruptedHelper.throwTaskKillException();
        }
      }

      long freedBytes = freeMemory();
      inMemSorter.freeMemory();

      return freedBytes;
    }
  }