client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [199:223]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public int peekNextBufferSubpartitionId(
      TieredStoragePartitionId tieredStoragePartitionId,
      ResultSubpartitionIndexSet resultSubpartitionIndexSet) {
    synchronized (lock) {
      // check health
      healthCheck();

      // return the subPartitionId if already receive buffer from corresponding subpartition
      Map<TieredStorageSubpartitionId, Queue<Buffer>> subPartitionReceivedBuffers =
          receivedBuffers.get(tieredStoragePartitionId);
      if (subPartitionReceivedBuffers == null) {
        return -1;
      }
      for (int subPartitionIndex = resultSubpartitionIndexSet.getStartIndex();
          subPartitionIndex <= resultSubpartitionIndexSet.getEndIndex();
          subPartitionIndex++) {
        Queue<Buffer> buffers =
            subPartitionReceivedBuffers.get(new TieredStorageSubpartitionId(subPartitionIndex));
        if (buffers != null && !buffers.isEmpty()) {
          return subPartitionIndex;
        }
      }
    }
    return -1;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [199:223]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public int peekNextBufferSubpartitionId(
      TieredStoragePartitionId tieredStoragePartitionId,
      ResultSubpartitionIndexSet resultSubpartitionIndexSet) {
    synchronized (lock) {
      // check health
      healthCheck();

      // return the subPartitionId if already receive buffer from corresponding subpartition
      Map<TieredStorageSubpartitionId, Queue<Buffer>> subPartitionReceivedBuffers =
          receivedBuffers.get(tieredStoragePartitionId);
      if (subPartitionReceivedBuffers == null) {
        return -1;
      }
      for (int subPartitionIndex = resultSubpartitionIndexSet.getStartIndex();
          subPartitionIndex <= resultSubpartitionIndexSet.getEndIndex();
          subPartitionIndex++) {
        Queue<Buffer> buffers =
            subPartitionReceivedBuffers.get(new TieredStorageSubpartitionId(subPartitionIndex));
        if (buffers != null && !buffers.isEmpty()) {
          return subPartitionIndex;
        }
      }
    }
    return -1;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



