client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [122:147]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public CelebornTierConsumerAgent(
      CelebornConf conf,
      List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
      List<TierShuffleDescriptor> shuffleDescriptors,
      int bufferSizeBytes) {
    checkArgument(!shuffleDescriptors.isEmpty(), "Wrong shuffle descriptors size.");
    checkArgument(
        tieredStorageConsumerSpecs.size() == shuffleDescriptors.size(),
        "Wrong consumer spec size.");
    this.conf = conf;
    this.gateIndex = tieredStorageConsumerSpecs.get(0).getGateIndex();
    this.consumerSpecs = tieredStorageConsumerSpecs;
    this.shuffleDescriptors = shuffleDescriptors;
    this.bufferReaders = new HashMap<>();
    this.receivedBuffers = new HashMap<>();
    this.subPartitionsNeedNotifyAvailable = new HashSet<>();
    this.bufferSizeBytes = bufferSizeBytes;
    for (TierShuffleDescriptor shuffleDescriptor : shuffleDescriptors) {
      if (shuffleDescriptor instanceof TierShuffleDescriptorImpl) {
        initShuffleClient((TierShuffleDescriptorImpl) shuffleDescriptor);
        break;
      }
    }
    checkNotNull(this.shuffleClient);
    initBufferReaders();
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [122:147]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public CelebornTierConsumerAgent(
      CelebornConf conf,
      List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
      List<TierShuffleDescriptor> shuffleDescriptors,
      int bufferSizeBytes) {
    checkArgument(!shuffleDescriptors.isEmpty(), "Wrong shuffle descriptors size.");
    checkArgument(
        tieredStorageConsumerSpecs.size() == shuffleDescriptors.size(),
        "Wrong consumer spec size.");
    this.conf = conf;
    this.gateIndex = tieredStorageConsumerSpecs.get(0).getGateIndex();
    this.consumerSpecs = tieredStorageConsumerSpecs;
    this.shuffleDescriptors = shuffleDescriptors;
    this.bufferReaders = new HashMap<>();
    this.receivedBuffers = new HashMap<>();
    this.subPartitionsNeedNotifyAvailable = new HashSet<>();
    this.bufferSizeBytes = bufferSizeBytes;
    for (TierShuffleDescriptor shuffleDescriptor : shuffleDescriptors) {
      if (shuffleDescriptor instanceof TierShuffleDescriptorImpl) {
        initShuffleClient((TierShuffleDescriptorImpl) shuffleDescriptor);
        break;
      }
    }
    checkNotNull(this.shuffleClient);
    initBufferReaders();
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



