client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [423:449]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void createBufferReader(
      ShuffleResourceDescriptor shuffleDescriptor,
      TieredStoragePartitionId partitionId,
      TieredStorageInputChannelId inputChannelId,
      ResultSubpartitionIndexSet subPartitionIdSet) {
    // create a single reader for multiple subPartitions to improvement performance
    CelebornChannelBufferReader reader =
        new CelebornChannelBufferReader(
            shuffleClient,
            shuffleDescriptor,
            inputChannelId,
            subPartitionIdSet.getStartIndex(),
            subPartitionIdSet.getEndIndex(),
            getDataListener(partitionId),
            getFailureListener(partitionId));

    for (int id = subPartitionIdSet.getStartIndex(); id <= subPartitionIdSet.getEndIndex(); id++) {
      TieredStorageSubpartitionId subPartitionId = new TieredStorageSubpartitionId(id);
      checkState(
          !bufferReaders.containsKey(partitionId)
              || !bufferReaders.get(partitionId).containsKey(subPartitionId),
          "Duplicate shuffle reader.");
      bufferReaders
          .computeIfAbsent(partitionId, partition -> new HashMap<>())
          .put(subPartitionId, reader);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [423:449]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void createBufferReader(
      ShuffleResourceDescriptor shuffleDescriptor,
      TieredStoragePartitionId partitionId,
      TieredStorageInputChannelId inputChannelId,
      ResultSubpartitionIndexSet subPartitionIdSet) {
    // create a single reader for multiple subPartitions to improvement performance
    CelebornChannelBufferReader reader =
        new CelebornChannelBufferReader(
            shuffleClient,
            shuffleDescriptor,
            inputChannelId,
            subPartitionIdSet.getStartIndex(),
            subPartitionIdSet.getEndIndex(),
            getDataListener(partitionId),
            getFailureListener(partitionId));

    for (int id = subPartitionIdSet.getStartIndex(); id <= subPartitionIdSet.getEndIndex(); id++) {
      TieredStorageSubpartitionId subPartitionId = new TieredStorageSubpartitionId(id);
      checkState(
          !bufferReaders.containsKey(partitionId)
              || !bufferReaders.get(partitionId).containsKey(subPartitionId),
          "Duplicate shuffle reader.");
      bufferReaders
          .computeIfAbsent(partitionId, partition -> new HashMap<>())
          .put(subPartitionId, reader);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



