client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java [189:218]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private JobShuffleContext getJobShuffleContext(
      JobID jobID, TierShuffleHandler tierShuffleHandler) {
    return new JobShuffleContext() {
      @Override
      public JobID getJobId() {
        return jobID;
      }

      @Override
      public CompletableFuture<?> stopTrackingAndReleasePartitions(
          Collection<ResultPartitionID> resultPartitionIds) {
        return tierShuffleHandler.onReleasePartitions(
            resultPartitionIds.stream()
                .map(TieredStorageIdMappingUtils::convertId)
                .collect(Collectors.toList()));
      }

      @Override
      public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
          Duration duration, Set<ResultPartitionID> set) {
        // TODO we should impl this when we support JM failover.
        return CompletableFuture.completedFuture(Collections.emptyList());
      }

      @Override
      public void notifyPartitionRecoveryStarted() {
        // TODO we should impl this when we support JM failover.
      }
    };
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java [189:218]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private JobShuffleContext getJobShuffleContext(
      JobID jobID, TierShuffleHandler tierShuffleHandler) {
    return new JobShuffleContext() {
      @Override
      public JobID getJobId() {
        return jobID;
      }

      @Override
      public CompletableFuture<?> stopTrackingAndReleasePartitions(
          Collection<ResultPartitionID> resultPartitionIds) {
        return tierShuffleHandler.onReleasePartitions(
            resultPartitionIds.stream()
                .map(TieredStorageIdMappingUtils::convertId)
                .collect(Collectors.toList()));
      }

      @Override
      public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
          Duration duration, Set<ResultPartitionID> set) {
        // TODO we should impl this when we support JM failover.
        return CompletableFuture.completedFuture(Collections.emptyList());
      }

      @Override
      public void notifyPartitionRecoveryStarted() {
        // TODO we should impl this when we support JM failover.
      }
    };
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



