client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [276:304]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void updateTierShuffleDescriptor(
      TieredStoragePartitionId tieredStoragePartitionId,
      TieredStorageInputChannelId tieredStorageInputChannelId,
      TieredStorageSubpartitionId subpartitionId,
      TierShuffleDescriptor tierShuffleDescriptor) {
    if (!(tierShuffleDescriptor instanceof TierShuffleDescriptorImpl)) {
      return;
    }
    TierShuffleDescriptorImpl shuffleDescriptor = (TierShuffleDescriptorImpl) tierShuffleDescriptor;
    checkState(
        shuffleDescriptor.getResultPartitionID().equals(tieredStoragePartitionId.getPartitionID()),
        "Wrong result partition id: " + shuffleDescriptor.getResultPartitionID());
    ResultSubpartitionIndexSet subpartitionIndexSet =
        new ResultSubpartitionIndexSet(subpartitionId.getSubpartitionId());
    if (!bufferReaders.containsKey(tieredStoragePartitionId)
        || !bufferReaders.get(tieredStoragePartitionId).containsKey(subpartitionId)) {
      ShuffleResourceDescriptor shuffleResourceDescriptor =
          shuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
      createBufferReader(
          shuffleResourceDescriptor,
          tieredStoragePartitionId,
          tieredStorageInputChannelId,
          subpartitionIndexSet);
      CelebornChannelBufferReader bufferReader =
          checkNotNull(getBufferReader(tieredStoragePartitionId, subpartitionId));
      bufferReader.setup(checkNotNull(memoryManager));
      openReader(bufferReader);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [276:304]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void updateTierShuffleDescriptor(
      TieredStoragePartitionId tieredStoragePartitionId,
      TieredStorageInputChannelId tieredStorageInputChannelId,
      TieredStorageSubpartitionId subpartitionId,
      TierShuffleDescriptor tierShuffleDescriptor) {
    if (!(tierShuffleDescriptor instanceof TierShuffleDescriptorImpl)) {
      return;
    }
    TierShuffleDescriptorImpl shuffleDescriptor = (TierShuffleDescriptorImpl) tierShuffleDescriptor;
    checkState(
        shuffleDescriptor.getResultPartitionID().equals(tieredStoragePartitionId.getPartitionID()),
        "Wrong result partition id: " + shuffleDescriptor.getResultPartitionID());
    ResultSubpartitionIndexSet subpartitionIndexSet =
        new ResultSubpartitionIndexSet(subpartitionId.getSubpartitionId());
    if (!bufferReaders.containsKey(tieredStoragePartitionId)
        || !bufferReaders.get(tieredStoragePartitionId).containsKey(subpartitionId)) {
      ShuffleResourceDescriptor shuffleResourceDescriptor =
          shuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
      createBufferReader(
          shuffleResourceDescriptor,
          tieredStoragePartitionId,
          tieredStorageInputChannelId,
          subpartitionIndexSet);
      CelebornChannelBufferReader bufferReader =
          checkNotNull(getBufferReader(tieredStoragePartitionId, subpartitionId));
      bufferReader.setup(checkNotNull(memoryManager));
      openReader(bufferReader);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



