client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [466:491]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void onBuffer(
      TieredStoragePartitionId partitionId,
      TieredStorageSubpartitionId subPartitionId,
      Buffer buffer) {
    boolean wasEmpty;
    synchronized (lock) {
      if (closed || cause != null) {
        buffer.recycleBuffer();
        recycleAllResources();
        throw new IllegalStateException("Input gate already closed or failed.");
      }
      Queue<Buffer> buffers =
          receivedBuffers
              .computeIfAbsent(partitionId, partition -> new HashMap<>())
              .computeIfAbsent(subPartitionId, subpartition -> new LinkedList<>());
      wasEmpty = buffers.isEmpty();
      buffers.add(buffer);
      if (wasEmpty && !started) {
        subPartitionsNeedNotifyAvailable.add(Tuple2.of(partitionId, subPartitionId));
        return;
      }
    }
    if (wasEmpty) {
      notifyAvailable(partitionId, subPartitionId);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [466:491]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void onBuffer(
      TieredStoragePartitionId partitionId,
      TieredStorageSubpartitionId subPartitionId,
      Buffer buffer) {
    boolean wasEmpty;
    synchronized (lock) {
      if (closed || cause != null) {
        buffer.recycleBuffer();
        recycleAllResources();
        throw new IllegalStateException("Input gate already closed or failed.");
      }
      Queue<Buffer> buffers =
          receivedBuffers
              .computeIfAbsent(partitionId, partition -> new HashMap<>())
              .computeIfAbsent(subPartitionId, subpartition -> new LinkedList<>());
      wasEmpty = buffers.isEmpty();
      buffers.add(buffer);
      if (wasEmpty && !started) {
        subPartitionsNeedNotifyAvailable.add(Tuple2.of(partitionId, subPartitionId));
        return;
      }
    }
    if (wasEmpty) {
      notifyAvailable(partitionId, subPartitionId);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



