client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [346:371]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void recycleAllResources() {
    List<Buffer> buffersToRecycle = new ArrayList<>();
    for (Map<TieredStorageSubpartitionId, CelebornChannelBufferReader> subPartitionReaders :
        bufferReaders.values()) {
      subPartitionReaders.values().forEach(CelebornChannelBufferReader::close);
    }
    synchronized (lock) {
      for (Map<TieredStorageSubpartitionId, Queue<Buffer>> subPartitionMap :
          receivedBuffers.values()) {
        buffersToRecycle.addAll(
            subPartitionMap.values().stream()
                .flatMap(Queue::stream)
                .collect(Collectors.toCollection(LinkedList::new)));
      }
      receivedBuffers.clear();
      bufferReaders.clear();
      availabilityNotifier = null;
      closed = true;
    }
    try {
      buffersToRecycle.forEach(Buffer::recycleBuffer);
    } catch (Throwable throwable) {
      LOG.error("Failed to recycle buffers.", throwable);
      throw throwable;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [346:371]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void recycleAllResources() {
    List<Buffer> buffersToRecycle = new ArrayList<>();
    for (Map<TieredStorageSubpartitionId, CelebornChannelBufferReader> subPartitionReaders :
        bufferReaders.values()) {
      subPartitionReaders.values().forEach(CelebornChannelBufferReader::close);
    }
    synchronized (lock) {
      for (Map<TieredStorageSubpartitionId, Queue<Buffer>> subPartitionMap :
          receivedBuffers.values()) {
        buffersToRecycle.addAll(
            subPartitionMap.values().stream()
                .flatMap(Queue::stream)
                .collect(Collectors.toCollection(LinkedList::new)));
      }
      receivedBuffers.clear();
      bufferReaders.clear();
      availabilityNotifier = null;
      closed = true;
    }
    try {
      buffersToRecycle.forEach(Buffer::recycleBuffer);
    } catch (Throwable throwable) {
      LOG.error("Failed to recycle buffers.", throwable);
      throw throwable;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



