client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java [201:231]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    while (records.hasNext()) {
      final Product2<Integer, UnsafeRow> record = records.next();
      final int partitionId = record._1();
      final UnsafeRow row = record._2();

      final int rowSize = row.getSizeInBytes();
      final int serializedRecordSize = 4 + rowSize;
      if (dataSize != null) {
        dataSize.add(serializedRecordSize);
      }

      if (serializedRecordSize > pushBufferMaxSize) {
        byte[] giantBuffer = new byte[serializedRecordSize];
        Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
        Platform.copyMemory(
            row.getBaseObject(),
            row.getBaseOffset(),
            giantBuffer,
            Platform.BYTE_ARRAY_OFFSET + 4,
            rowSize);
        pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
      } else {
        boolean success =
            currentPusher.insertRecord(
                row.getBaseObject(), row.getBaseOffset(), rowSize, partitionId, true);
        if (!success) {
          pushAndSwitch();
          success =
              currentPusher.insertRecord(
                  row.getBaseObject(), row.getBaseOffset(), rowSize, partitionId, true);
          if (!success) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java [250:281]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    while (records.hasNext()) {
      final Product2<Integer, UnsafeRow> record = records.next();
      final int partitionId = record._1();
      final UnsafeRow row = record._2();

      final int rowSize = row.getSizeInBytes();
      final int serializedRecordSize = 4 + rowSize;

      if (dataSize != null) {
        dataSize.add(serializedRecordSize);
      }

      if (serializedRecordSize > pushBufferMaxSize) {
        byte[] giantBuffer = new byte[serializedRecordSize];
        Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
        Platform.copyMemory(
            row.getBaseObject(),
            row.getBaseOffset(),
            giantBuffer,
            Platform.BYTE_ARRAY_OFFSET + 4,
            rowSize);
        pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
      } else {
        boolean success =
            currentPusher.insertRecord(
                row.getBaseObject(), row.getBaseOffset(), rowSize, partitionId, true);
        if (!success) {
          pushAndSwitch();
          success =
              currentPusher.insertRecord(
                  row.getBaseObject(), row.getBaseOffset(), rowSize, partitionId, true);
          if (!success) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



