client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java [148:178]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void close() {
    // It may be call multiple times because subPartitions can share the same reader, as a single
    // reader can consume multiple subPartitions
    if (closed) {
      return;
    }

    // need set closed first before remove Handler
    closed = true;
    if (!CelebornBufferStream.isEmptyStream(bufferStream)) {
      bufferStream.close();
      bufferStream = null;
    } else {
      LOG.warn(
          "bufferStream is null when closed, shuffleId: {}, partitionId: {}",
          shuffleId,
          partitionId);
    }

    try {
      if (bufferManager != null) {
        bufferManager.close();
        bufferManager = null;
      }
    } catch (Throwable throwable) {
      LOG.warn("Failed to close buffer manager.", throwable);
    }

    subPartitionRequiredSegmentIds.clear();
    subPartitionRequiredSegmentIds = null;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java [148:178]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void close() {
    // It may be call multiple times because subPartitions can share the same reader, as a single
    // reader can consume multiple subPartitions
    if (closed) {
      return;
    }

    // need set closed first before remove Handler
    closed = true;
    if (!CelebornBufferStream.isEmptyStream(bufferStream)) {
      bufferStream.close();
      bufferStream = null;
    } else {
      LOG.warn(
          "bufferStream is null when closed, shuffleId: {}, partitionId: {}",
          shuffleId,
          partitionId);
    }

    try {
      if (bufferManager != null) {
        bufferManager.close();
        bufferManager = null;
      }
    } catch (Throwable throwable) {
      LOG.warn("Failed to close buffer manager.", throwable);
    }

    subPartitionRequiredSegmentIds.clear();
    subPartitionRequiredSegmentIds = null;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



