client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [193:227]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public boolean tryWrite(
      TieredStorageSubpartitionId tieredStorageSubpartitionId,
      Buffer buffer,
      Object bufferOwner,
      int numRemainingConsecutiveBuffers) {
    // It should be noted that, unlike RemoteShuffleOutputGate#write, the received buffer contains
    // only
    // and does not have any remaining space for writing the celeborn header.

    int subPartitionId = tieredStorageSubpartitionId.getSubpartitionId();

    if (subPartitionSegmentBuffers[subPartitionId] + 1 + numRemainingConsecutiveBuffers
        >= numBuffersPerSegment) {
      // End the current segment if the segment buffer count reaches the threshold
      subPartitionSegmentBuffers[subPartitionId] = 0;
      try {
        bufferPacker.drain();
      } catch (InterruptedException e) {
        buffer.recycleBuffer();
        ExceptionUtils.rethrow(e, "Failed to process buffer.");
      }
      appendEndOfSegmentBuffer(subPartitionId);
      return false;
    }

    if (buffer.isBuffer()) {
      memoryManager.transferBufferOwnership(
          bufferOwner, CelebornTierFactory.getCelebornTierName(), buffer);
    }

    // write buffer to BufferPacker and record buffer count per subPartition per segment
    processBuffer(buffer, subPartitionId);
    subPartitionSegmentBuffers[subPartitionId]++;
    return true;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [193:227]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public boolean tryWrite(
      TieredStorageSubpartitionId tieredStorageSubpartitionId,
      Buffer buffer,
      Object bufferOwner,
      int numRemainingConsecutiveBuffers) {
    // It should be noted that, unlike RemoteShuffleOutputGate#write, the received buffer contains
    // only
    // and does not have any remaining space for writing the celeborn header.

    int subPartitionId = tieredStorageSubpartitionId.getSubpartitionId();

    if (subPartitionSegmentBuffers[subPartitionId] + 1 + numRemainingConsecutiveBuffers
        >= numBuffersPerSegment) {
      // End the current segment if the segment buffer count reaches the threshold
      subPartitionSegmentBuffers[subPartitionId] = 0;
      try {
        bufferPacker.drain();
      } catch (InterruptedException e) {
        buffer.recycleBuffer();
        ExceptionUtils.rethrow(e, "Failed to process buffer.");
      }
      appendEndOfSegmentBuffer(subPartitionId);
      return false;
    }

    if (buffer.isBuffer()) {
      memoryManager.transferBufferOwnership(
          bufferOwner, CelebornTierFactory.getCelebornTierName(), buffer);
    }

    // write buffer to BufferPacker and record buffer count per subPartition per segment
    processBuffer(buffer, subPartitionId);
    subPartitionSegmentBuffers[subPartitionId]++;
    return true;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



