client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [226:267]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public Optional<Buffer> getNextBuffer(
      TieredStoragePartitionId tieredStoragePartitionId,
      TieredStorageSubpartitionId tieredStorageSubpartitionId,
      int segmentId) {
    synchronized (lock) {
      // check health
      healthCheck();
    }

    // check reader status
    if (!bufferReaders.containsKey(tieredStoragePartitionId)
        || !bufferReaders.get(tieredStoragePartitionId).containsKey(tieredStorageSubpartitionId)) {
      return Optional.empty();
    }
    try {
      boolean openReaderSuccess = openReader(tieredStoragePartitionId, tieredStorageSubpartitionId);
      if (!openReaderSuccess) {
        return Optional.empty();
      }
    } catch (Throwable throwable) {
      LOG.error("Failed to open reader.", throwable);
      recycleAllResources();
      ExceptionUtils.rethrow(throwable);
    }

    synchronized (lock) {
      CelebornChannelBufferReader bufferReader =
          getBufferReader(tieredStoragePartitionId, tieredStorageSubpartitionId);
      bufferReader.notifyRequiredSegmentIfNeeded(
          segmentId, tieredStorageSubpartitionId.getSubpartitionId());
      Map<TieredStorageSubpartitionId, Queue<Buffer>> partitionBuffers =
          receivedBuffers.get(tieredStoragePartitionId);
      if (partitionBuffers == null || partitionBuffers.isEmpty()) {
        return Optional.empty();
      }
      Queue<Buffer> subPartitionBuffers = partitionBuffers.get(tieredStorageSubpartitionId);
      if (subPartitionBuffers == null || subPartitionBuffers.isEmpty()) {
        return Optional.empty();
      }
      return Optional.ofNullable(subPartitionBuffers.poll());
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [226:267]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public Optional<Buffer> getNextBuffer(
      TieredStoragePartitionId tieredStoragePartitionId,
      TieredStorageSubpartitionId tieredStorageSubpartitionId,
      int segmentId) {
    synchronized (lock) {
      // check health
      healthCheck();
    }

    // check reader status
    if (!bufferReaders.containsKey(tieredStoragePartitionId)
        || !bufferReaders.get(tieredStoragePartitionId).containsKey(tieredStorageSubpartitionId)) {
      return Optional.empty();
    }
    try {
      boolean openReaderSuccess = openReader(tieredStoragePartitionId, tieredStorageSubpartitionId);
      if (!openReaderSuccess) {
        return Optional.empty();
      }
    } catch (Throwable throwable) {
      LOG.error("Failed to open reader.", throwable);
      recycleAllResources();
      ExceptionUtils.rethrow(throwable);
    }

    synchronized (lock) {
      CelebornChannelBufferReader bufferReader =
          getBufferReader(tieredStoragePartitionId, tieredStorageSubpartitionId);
      bufferReader.notifyRequiredSegmentIfNeeded(
          segmentId, tieredStorageSubpartitionId.getSubpartitionId());
      Map<TieredStorageSubpartitionId, Queue<Buffer>> partitionBuffers =
          receivedBuffers.get(tieredStoragePartitionId);
      if (partitionBuffers == null || partitionBuffers.isEmpty()) {
        return Optional.empty();
      }
      Queue<Buffer> subPartitionBuffers = partitionBuffers.get(tieredStorageSubpartitionId);
      if (subPartitionBuffers == null || subPartitionBuffers.isEmpty()) {
        return Optional.empty();
      }
      return Optional.ofNullable(subPartitionBuffers.poll());
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



