client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java [58:160]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public RemoteShuffleResultPartition(
      String owningTaskName,
      int partitionIndex,
      ResultPartitionID partitionId,
      ResultPartitionType partitionType,
      int numSubpartitions,
      int numTargetKeyGroups,
      int networkBufferSize,
      ResultPartitionManager partitionManager,
      @Nullable BufferCompressor bufferCompressor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      RemoteShuffleOutputGate outputGate) {

    super(
        owningTaskName,
        partitionIndex,
        partitionId,
        partitionType,
        numSubpartitions,
        numTargetKeyGroups,
        partitionManager,
        bufferCompressor,
        bufferPoolFactory);

    delegation =
        new RemoteShuffleResultPartitionDelegation(
            networkBufferSize,
            outputGate,
            (bufferWithChannel, isBroadcast) -> updateStatistics(bufferWithChannel, isBroadcast),
            numSubpartitions);
  }

  @Override
  public void setup() throws IOException {
    super.setup();
    BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
    delegation.setup(
        bufferPool,
        bufferCompressor,
        buffer -> canBeCompressed(buffer),
        () -> checkInProduceState());
  }

  @Override
  public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
    delegation.emit(record, targetSubpartition, DataType.DATA_BUFFER, false);
  }

  @Override
  public void broadcastRecord(ByteBuffer record) throws IOException {
    delegation.broadcast(record, DataType.DATA_BUFFER);
  }

  @Override
  public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
    Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
    try {
      ByteBuffer serializedEvent = buffer.getNioBufferReadable();
      delegation.broadcast(serializedEvent, buffer.getDataType());
    } finally {
      buffer.recycleBuffer();
    }
  }

  @Override
  public void finish() throws IOException {
    Utils.checkState(!isReleased(), "Result partition is already released.");
    broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
    delegation.finish();
    super.finish();
  }

  @Override
  public synchronized void close() {
    delegation.close(() -> super.close());
  }

  @Override
  protected void releaseInternal() {
    // no-op
  }

  @Override
  public void flushAll() {
    delegation.flushAll();
  }

  @Override
  public void flush(int subpartitionIndex) {
    flushAll();
  }

  @Override
  public CompletableFuture<?> getAvailableFuture() {
    return AVAILABLE;
  }

  @Override
  public int getNumberOfQueuedBuffers() {
    return 0;
  }

  @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java [53:155]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public RemoteShuffleResultPartition(
      String owningTaskName,
      int partitionIndex,
      ResultPartitionID partitionId,
      ResultPartitionType partitionType,
      int numSubpartitions,
      int numTargetKeyGroups,
      int networkBufferSize,
      ResultPartitionManager partitionManager,
      @Nullable BufferCompressor bufferCompressor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      RemoteShuffleOutputGate outputGate) {

    super(
        owningTaskName,
        partitionIndex,
        partitionId,
        partitionType,
        numSubpartitions,
        numTargetKeyGroups,
        partitionManager,
        bufferCompressor,
        bufferPoolFactory);

    delegation =
        new RemoteShuffleResultPartitionDelegation(
            networkBufferSize,
            outputGate,
            (bufferWithChannel, isBroadcast) -> updateStatistics(bufferWithChannel, isBroadcast),
            numSubpartitions);
  }

  @Override
  public void setup() throws IOException {
    super.setup();
    BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
    delegation.setup(
        bufferPool,
        bufferCompressor,
        buffer -> canBeCompressed(buffer),
        () -> checkInProduceState());
  }

  @Override
  public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
    delegation.emit(record, targetSubpartition, DataType.DATA_BUFFER, false);
  }

  @Override
  public void broadcastRecord(ByteBuffer record) throws IOException {
    delegation.broadcast(record, DataType.DATA_BUFFER);
  }

  @Override
  public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
    Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
    try {
      ByteBuffer serializedEvent = buffer.getNioBufferReadable();
      delegation.broadcast(serializedEvent, buffer.getDataType());
    } finally {
      buffer.recycleBuffer();
    }
  }

  @Override
  public void finish() throws IOException {
    Utils.checkState(!isReleased(), "Result partition is already released.");
    broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
    delegation.finish();
    super.finish();
  }

  @Override
  public synchronized void close() {
    delegation.close(() -> super.close());
  }

  @Override
  protected void releaseInternal() {
    // no-op
  }

  @Override
  public void flushAll() {
    delegation.flushAll();
  }

  @Override
  public void flush(int subpartitionIndex) {
    flushAll();
  }

  @Override
  public CompletableFuture<?> getAvailableFuture() {
    return AVAILABLE;
  }

  @Override
  public int getNumberOfQueuedBuffers() {
    return 0;
  }

  @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



