client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java [49:96]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleResultPartition extends ResultPartition {

  private final RemoteShuffleResultPartitionDelegation delegation;

  public RemoteShuffleResultPartition(
      String owningTaskName,
      int partitionIndex,
      ResultPartitionID partitionId,
      ResultPartitionType partitionType,
      int numSubpartitions,
      int numTargetKeyGroups,
      int networkBufferSize,
      ResultPartitionManager partitionManager,
      @Nullable BufferCompressor bufferCompressor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      RemoteShuffleOutputGate outputGate) {

    super(
        owningTaskName,
        partitionIndex,
        partitionId,
        partitionType,
        numSubpartitions,
        numTargetKeyGroups,
        partitionManager,
        bufferCompressor,
        bufferPoolFactory);

    delegation =
        new RemoteShuffleResultPartitionDelegation(
            networkBufferSize,
            outputGate,
            (bufferWithChannel, isBroadcast) -> updateStatistics(bufferWithChannel, isBroadcast),
            numSubpartitions);
  }

  @Override
  public void setup() throws IOException {
    super.setup();
    BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
    delegation.setup(
        bufferPool,
        bufferCompressor,
        buffer -> canBeCompressed(buffer),
        () -> checkInProduceState());
  }

  @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java [50:97]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleResultPartition extends ResultPartition {

  private final RemoteShuffleResultPartitionDelegation delegation;

  public RemoteShuffleResultPartition(
      String owningTaskName,
      int partitionIndex,
      ResultPartitionID partitionId,
      ResultPartitionType partitionType,
      int numSubpartitions,
      int numTargetKeyGroups,
      int networkBufferSize,
      ResultPartitionManager partitionManager,
      @Nullable BufferCompressor bufferCompressor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      RemoteShuffleOutputGate outputGate) {

    super(
        owningTaskName,
        partitionIndex,
        partitionId,
        partitionType,
        numSubpartitions,
        numTargetKeyGroups,
        partitionManager,
        bufferCompressor,
        bufferPoolFactory);

    delegation =
        new RemoteShuffleResultPartitionDelegation(
            networkBufferSize,
            outputGate,
            (bufferWithChannel, isBroadcast) -> updateStatistics(bufferWithChannel, isBroadcast),
            numSubpartitions);
  }

  @Override
  public void setup() throws IOException {
    super.setup();
    BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
    delegation.setup(
        bufferPool,
        bufferCompressor,
        buffer -> canBeCompressed(buffer),
        () -> checkInProduceState());
  }

  @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



