client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java [117:205]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void finish() throws IOException {
    Utils.checkState(!isReleased(), "Result partition is already released.");
    broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
    delegation.finish();
    super.finish();
  }

  @Override
  public synchronized void close() {
    delegation.close(() -> super.close());
  }

  @Override
  protected void releaseInternal() {
    // no-op
  }

  @Override
  public void flushAll() {
    delegation.flushAll();
  }

  @Override
  public void flush(int subpartitionIndex) {
    flushAll();
  }

  @Override
  public CompletableFuture<?> getAvailableFuture() {
    return AVAILABLE;
  }

  @Override
  public int getNumberOfQueuedBuffers() {
    return 0;
  }

  @Override
  public long getSizeOfQueuedBuffersUnsafe() {
    return 0;
  }

  @Override
  public int getNumberOfQueuedBuffers(int targetSubpartition) {
    return 0;
  }

  @Override
  public ResultSubpartitionView createSubpartitionView(
      int index, BufferAvailabilityListener availabilityListener) {
    throw new UnsupportedOperationException("Not supported.");
  }

  @Override
  public void notifyEndOfData(StopMode mode) throws IOException {
    if (!delegation.isEndOfDataNotified()) {
      broadcastEvent(new EndOfData(mode), false);
      delegation.setEndOfDataNotified(true);
    }
  }

  @Override
  public CompletableFuture<Void> getAllDataProcessedFuture() {
    return CompletableFuture.completedFuture(null);
  }

  @Override
  public String toString() {
    return "ResultPartition "
        + partitionId.toString()
        + " ["
        + partitionType
        + ", "
        + numSubpartitions
        + " subpartitions, shuffle-descriptor: "
        + delegation.getOutputGate().getShuffleDesc()
        + "]";
  }

  @VisibleForTesting
  public RemoteShuffleResultPartitionDelegation getDelegation() {
    return delegation;
  }

  public void updateStatistics(
      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
    numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
    long readableBytes = bufferWithChannel.getBuffer().readableBytes() - BufferUtils.HEADER_LENGTH;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java [129:217]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void finish() throws IOException {
    Utils.checkState(!isReleased(), "Result partition is already released.");
    broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
    delegation.finish();
    super.finish();
  }

  @Override
  public synchronized void close() {
    delegation.close(() -> super.close());
  }

  @Override
  protected void releaseInternal() {
    // no-op
  }

  @Override
  public void flushAll() {
    delegation.flushAll();
  }

  @Override
  public void flush(int subpartitionIndex) {
    flushAll();
  }

  @Override
  public CompletableFuture<?> getAvailableFuture() {
    return AVAILABLE;
  }

  @Override
  public int getNumberOfQueuedBuffers() {
    return 0;
  }

  @Override
  public long getSizeOfQueuedBuffersUnsafe() {
    return 0;
  }

  @Override
  public int getNumberOfQueuedBuffers(int targetSubpartition) {
    return 0;
  }

  @Override
  public ResultSubpartitionView createSubpartitionView(
      int index, BufferAvailabilityListener availabilityListener) {
    throw new UnsupportedOperationException("Not supported.");
  }

  @Override
  public void notifyEndOfData(StopMode mode) throws IOException {
    if (!delegation.isEndOfDataNotified()) {
      broadcastEvent(new EndOfData(mode), false);
      delegation.setEndOfDataNotified(true);
    }
  }

  @Override
  public CompletableFuture<Void> getAllDataProcessedFuture() {
    return CompletableFuture.completedFuture(null);
  }

  @Override
  public String toString() {
    return "ResultPartition "
        + partitionId.toString()
        + " ["
        + partitionType
        + ", "
        + numSubpartitions
        + " subpartitions, shuffle-descriptor: "
        + delegation.getOutputGate().getShuffleDesc()
        + "]";
  }

  @VisibleForTesting
  public RemoteShuffleResultPartitionDelegation getDelegation() {
    return delegation;
  }

  public void updateStatistics(
      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
    numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
    long readableBytes = bufferWithChannel.getBuffer().readableBytes() - BufferUtils.HEADER_LENGTH;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



