client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [390:429]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
    try {
      // create a composite buffer and write a header into it. This composite buffer will serve as
      // the result packed buffer.
      CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
      ByteBuf headerBuf = Unpooled.buffer(BufferUtils.HEADER_LENGTH);

      // write celeborn buffer header (subpartitionid(4) + attemptId(4) + nextBatchId(4) +
      // compressedsize)
      headerBuf.writeInt(bufferHeader.getSubPartitionId());
      headerBuf.writeInt(attemptId);
      headerBuf.writeInt(0);
      headerBuf.writeInt(
          byteBuf.readableBytes() + (BufferUtils.HEADER_LENGTH - BufferUtils.HEADER_LENGTH_PREFIX));

      // write flink buffer header (dataType(1) + isCompress(1) + size(4))
      headerBuf.writeByte(bufferHeader.getDataType().ordinal());
      headerBuf.writeBoolean(bufferHeader.isCompressed());
      headerBuf.writeInt(bufferHeader.getSize());

      // composite the headerBuf and data buffer together
      compositeByteBuf.addComponents(true, headerBuf, byteBuf);
      io.netty.buffer.ByteBuf wrappedBuffer =
          io.netty.buffer.Unpooled.wrappedBuffer(compositeByteBuf.nioBuffer());

      int numWritten =
          flinkShuffleClient.pushDataToLocation(
              shuffleId,
              mapId,
              attemptId,
              bufferHeader.getSubPartitionId(),
              wrappedBuffer,
              partitionLocation,
              compositeByteBuf::release);
      checkState(
          numWritten == byteBuf.readableBytes() + BufferUtils.HEADER_LENGTH, "Wrong written size.");
    } catch (IOException e) {
      Utils.rethrowAsRuntimeException(e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [390:429]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
    try {
      // create a composite buffer and write a header into it. This composite buffer will serve as
      // the result packed buffer.
      CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
      ByteBuf headerBuf = Unpooled.buffer(BufferUtils.HEADER_LENGTH);

      // write celeborn buffer header (subpartitionid(4) + attemptId(4) + nextBatchId(4) +
      // compressedsize)
      headerBuf.writeInt(bufferHeader.getSubPartitionId());
      headerBuf.writeInt(attemptId);
      headerBuf.writeInt(0);
      headerBuf.writeInt(
          byteBuf.readableBytes() + (BufferUtils.HEADER_LENGTH - BufferUtils.HEADER_LENGTH_PREFIX));

      // write flink buffer header (dataType(1) + isCompress(1) + size(4))
      headerBuf.writeByte(bufferHeader.getDataType().ordinal());
      headerBuf.writeBoolean(bufferHeader.isCompressed());
      headerBuf.writeInt(bufferHeader.getSize());

      // composite the headerBuf and data buffer together
      compositeByteBuf.addComponents(true, headerBuf, byteBuf);
      io.netty.buffer.ByteBuf wrappedBuffer =
          io.netty.buffer.Unpooled.wrappedBuffer(compositeByteBuf.nioBuffer());

      int numWritten =
          flinkShuffleClient.pushDataToLocation(
              shuffleId,
              mapId,
              attemptId,
              bufferHeader.getSubPartitionId(),
              wrappedBuffer,
              partitionLocation,
              compositeByteBuf::release);
      checkState(
          numWritten == byteBuf.readableBytes() + BufferUtils.HEADER_LENGTH, "Wrong written size.");
    } catch (IOException e) {
      Utils.rethrowAsRuntimeException(e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



