client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java [280:356]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void write0(scala.collection.Iterator iterator) throws IOException {
    final scala.collection.Iterator<Product2<K, ?>> records = iterator;

    while (records.hasNext()) {
      final Product2<K, ?> record = records.next();
      final K key = record._1();
      final int partitionId = partitioner.getPartition(key);
      serBuffer.reset();
      serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
      serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
      serOutputStream.flush();

      final int serializedRecordSize = serBuffer.size();
      assert (serializedRecordSize > 0);

      if (serializedRecordSize > pushBufferMaxSize) {
        pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
      } else {
        boolean success =
            currentPusher.insertRecord(
                serBuffer.getBuf(),
                Platform.BYTE_ARRAY_OFFSET,
                serializedRecordSize,
                partitionId,
                false);
        if (!success) {
          pushAndSwitch();
          success =
              currentPusher.insertRecord(
                  serBuffer.getBuf(),
                  Platform.BYTE_ARRAY_OFFSET,
                  serializedRecordSize,
                  partitionId,
                  false);
          if (!success) {
            throw new IOException("Unable to push after switching pusher!");
          }
        }
      }
      tmpRecords[partitionId] += 1;
    }
  }

  private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
    logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
    int bytesWritten =
        shuffleClient.pushData(
            shuffleId,
            mapId,
            taskContext.attemptNumber(),
            partitionId,
            buffer,
            0,
            numBytes,
            numMappers,
            numPartitions);
    mapStatusLengths[partitionId].add(bytesWritten);
    writeMetrics.incBytesWritten(bytesWritten);
  }

  private void close() throws IOException {
    if (pipelined) {
      logger.info(
          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + pushers[1].getUsed())));
    } else {
      logger.info("Memory used {}", Utils.bytesToString(currentPusher.getUsed()));
    }
    long pushStartTime = System.nanoTime();
    if (pipelined) {
      for (SortBasedPusher pusher : pushers) {
        pusher.waitPushFinish();
        pusher.pushData();
        pusher.close();
      }
    } else {
      currentPusher.pushData();
      currentPusher.close();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java [302:378]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void write0(scala.collection.Iterator iterator) throws IOException {
    final scala.collection.Iterator<Product2<K, ?>> records = iterator;

    while (records.hasNext()) {
      final Product2<K, ?> record = records.next();
      final K key = record._1();
      final int partitionId = partitioner.getPartition(key);
      serBuffer.reset();
      serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
      serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
      serOutputStream.flush();

      final int serializedRecordSize = serBuffer.size();
      assert (serializedRecordSize > 0);

      if (serializedRecordSize > pushBufferMaxSize) {
        pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
      } else {
        boolean success =
            currentPusher.insertRecord(
                serBuffer.getBuf(),
                Platform.BYTE_ARRAY_OFFSET,
                serializedRecordSize,
                partitionId,
                false);
        if (!success) {
          pushAndSwitch();
          success =
              currentPusher.insertRecord(
                  serBuffer.getBuf(),
                  Platform.BYTE_ARRAY_OFFSET,
                  serializedRecordSize,
                  partitionId,
                  false);
          if (!success) {
            throw new IOException("Unable to push after switching pusher!");
          }
        }
      }
      tmpRecords[partitionId] += 1;
    }
  }

  private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
    logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
    int bytesWritten =
        shuffleClient.pushData(
            shuffleId,
            mapId,
            taskContext.attemptNumber(),
            partitionId,
            buffer,
            0,
            numBytes,
            numMappers,
            numPartitions);
    mapStatusLengths[partitionId].add(bytesWritten);
    writeMetrics.incBytesWritten(bytesWritten);
  }

  private void close() throws IOException {
    if (pipelined) {
      logger.info(
          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + pushers[1].getUsed())));
    } else {
      logger.info("Memory used {}", Utils.bytesToString(currentPusher.getUsed()));
    }
    long pushStartTime = System.nanoTime();
    if (pipelined) {
      for (SortBasedPusher pusher : pushers) {
        pusher.waitPushFinish();
        pusher.pushData();
        pusher.close();
      }
    } else {
      currentPusher.pushData();
      currentPusher.close();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



