client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [395:421]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void initBufferReaders() {
    for (int i = 0; i < shuffleDescriptors.size(); i++) {
      if (!(shuffleDescriptors.get(i) instanceof TierShuffleDescriptorImpl)) {
        continue;
      }
      TierShuffleDescriptorImpl shuffleDescriptor =
          (TierShuffleDescriptorImpl) shuffleDescriptors.get(i);
      ResultPartitionID resultPartitionID = shuffleDescriptor.getResultPartitionID();
      ShuffleResourceDescriptor shuffleResourceDescriptor =
          shuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
      TieredStoragePartitionId partitionId = new TieredStoragePartitionId(resultPartitionID);
      checkState(consumerSpecs.get(i).getPartitionId().equals(partitionId), "Wrong partition id.");
      ResultSubpartitionIndexSet subPartitionIdSet = consumerSpecs.get(i).getSubpartitionIds();
      LOG.debug(
          "create shuffle reader for gate {} descriptor {} partitionId {}, subPartitionId start {} and end {}",
          gateIndex,
          shuffleResourceDescriptor,
          partitionId,
          subPartitionIdSet.getStartIndex(),
          subPartitionIdSet.getEndIndex());
      createBufferReader(
          shuffleResourceDescriptor,
          partitionId,
          consumerSpecs.get(i).getInputChannelId(),
          subPartitionIdSet);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [395:421]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void initBufferReaders() {
    for (int i = 0; i < shuffleDescriptors.size(); i++) {
      if (!(shuffleDescriptors.get(i) instanceof TierShuffleDescriptorImpl)) {
        continue;
      }
      TierShuffleDescriptorImpl shuffleDescriptor =
          (TierShuffleDescriptorImpl) shuffleDescriptors.get(i);
      ResultPartitionID resultPartitionID = shuffleDescriptor.getResultPartitionID();
      ShuffleResourceDescriptor shuffleResourceDescriptor =
          shuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
      TieredStoragePartitionId partitionId = new TieredStoragePartitionId(resultPartitionID);
      checkState(consumerSpecs.get(i).getPartitionId().equals(partitionId), "Wrong partition id.");
      ResultSubpartitionIndexSet subPartitionIdSet = consumerSpecs.get(i).getSubpartitionIds();
      LOG.debug(
          "create shuffle reader for gate {} descriptor {} partitionId {}, subPartitionId start {} and end {}",
          gateIndex,
          shuffleResourceDescriptor,
          partitionId,
          subPartitionIdSet.getStartIndex(),
          subPartitionIdSet.getEndIndex());
      createBufferReader(
          shuffleResourceDescriptor,
          partitionId,
          consumerSpecs.get(i).getInputChannelId(),
          subPartitionIdSet);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



