client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [159:196]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void start() {
    // notify input gate that some sub partitions are available
    Set<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>> needNotifyAvailable;
    synchronized (lock) {
      needNotifyAvailable = new HashSet<>(subPartitionsNeedNotifyAvailable);
      subPartitionsNeedNotifyAvailable.clear();
      started = true;
    }
    try {
      needNotifyAvailable.forEach(
          partitionIdTuple -> notifyAvailable(partitionIdTuple.f0, partitionIdTuple.f1));
    } catch (Throwable t) {
      LOG.error("Error occurred when notifying sub partitions available", t);
      recycleAllResources();
      ExceptionUtils.rethrow(t);
    }
    needNotifyAvailable.clear();

    // Require segment 0 when starting the client
    for (TieredStorageConsumerSpec spec : consumerSpecs) {
      for (int subpartitionId : spec.getSubpartitionIds().values()) {
        CelebornChannelBufferReader bufferReader =
            getBufferReader(spec.getPartitionId(), new TieredStorageSubpartitionId(subpartitionId));
        if (bufferReader == null) {
          continue;
        }
        // TODO: if fail to open reader, may the downstream task start before than upstream task,
        // should retry open reader, rather than throw exception
        boolean openReaderSuccess = openReader(bufferReader);
        if (!openReaderSuccess) {
          LOG.error("Failed to open reader.");
          recycleAllResources();
          ExceptionUtils.rethrow(new IOException("Failed to open reader."));
        }
        bufferReader.notifyRequiredSegmentIfNeeded(0, subpartitionId);
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [159:196]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void start() {
    // notify input gate that some sub partitions are available
    Set<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>> needNotifyAvailable;
    synchronized (lock) {
      needNotifyAvailable = new HashSet<>(subPartitionsNeedNotifyAvailable);
      subPartitionsNeedNotifyAvailable.clear();
      started = true;
    }
    try {
      needNotifyAvailable.forEach(
          partitionIdTuple -> notifyAvailable(partitionIdTuple.f0, partitionIdTuple.f1));
    } catch (Throwable t) {
      LOG.error("Error occurred when notifying sub partitions available", t);
      recycleAllResources();
      ExceptionUtils.rethrow(t);
    }
    needNotifyAvailable.clear();

    // Require segment 0 when starting the client
    for (TieredStorageConsumerSpec spec : consumerSpecs) {
      for (int subpartitionId : spec.getSubpartitionIds().values()) {
        CelebornChannelBufferReader bufferReader =
            getBufferReader(spec.getPartitionId(), new TieredStorageSubpartitionId(subpartitionId));
        if (bufferReader == null) {
          continue;
        }
        // TODO: if fail to open reader, may the downstream task start before than upstream task,
        // should retry open reader, rather than throw exception
        boolean openReaderSuccess = openReader(bufferReader);
        if (!openReaderSuccess) {
          LOG.error("Failed to open reader.");
          recycleAllResources();
          ExceptionUtils.rethrow(new IOException("Failed to open reader."));
        }
        bufferReader.notifyRequiredSegmentIfNeeded(0, subpartitionId);
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



