client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [449:477]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void processBuffer(Buffer originBuffer, int subPartitionId) {
    try {
      regionStartOrFinish(subPartitionId);
      currentSubpartition = subPartitionId;

      Buffer buffer = originBuffer;
      if (originBuffer.isCompressed()) {
        // Flink hybrid shuffle will send a compressed buffer to tier. However, since we need to
        // write data to this buffer and the compressed buffer is read-only, we must create a
        // new Buffer object to the wrap origin buffer.
        NetworkBuffer networkBuffer =
            new NetworkBuffer(
                originBuffer.getMemorySegment(),
                originBuffer.getRecycler(),
                originBuffer.getDataType(),
                originBuffer.getSize());
        networkBuffer.writerIndex(originBuffer.asByteBuf().writerIndex());
        buffer = networkBuffer;
      }

      // set the buffer meta
      BufferUtils.setCompressedDataWithoutHeader(buffer, originBuffer);

      bufferPacker.process(buffer, subPartitionId);
    } catch (InterruptedException e) {
      originBuffer.recycleBuffer();
      ExceptionUtils.rethrow(e, "Failed to process buffer.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [449:477]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void processBuffer(Buffer originBuffer, int subPartitionId) {
    try {
      regionStartOrFinish(subPartitionId);
      currentSubpartition = subPartitionId;

      Buffer buffer = originBuffer;
      if (originBuffer.isCompressed()) {
        // Flink hybrid shuffle will send a compressed buffer to tier. However, since we need to
        // write data to this buffer and the compressed buffer is read-only, we must create a
        // new Buffer object to the wrap origin buffer.
        NetworkBuffer networkBuffer =
            new NetworkBuffer(
                originBuffer.getMemorySegment(),
                originBuffer.getRecycler(),
                originBuffer.getDataType(),
                originBuffer.getSize());
        networkBuffer.writerIndex(originBuffer.asByteBuf().writerIndex());
        buffer = networkBuffer;
      }

      // set the buffer meta
      BufferUtils.setCompressedDataWithoutHeader(buffer, originBuffer);

      bufferPacker.process(buffer, subPartitionId);
    } catch (InterruptedException e) {
      originBuffer.recycleBuffer();
      ExceptionUtils.rethrow(e, "Failed to process buffer.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



