client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java [123:217]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
    if (shuffleIds == null) {
      throw new RuntimeException("Can not find job in master agent, job: " + jobID);
    }
    FlinkResultPartitionInfo resultPartitionInfo =
        new FlinkResultPartitionInfo(jobID, resultPartitionID);
    ShuffleResourceDescriptor shuffleResourceDescriptor =
        shuffleTaskInfo.genShuffleResourceDescriptor(
            resultPartitionInfo.getShuffleId(),
            resultPartitionInfo.getTaskId(),
            resultPartitionInfo.getAttemptId());
    shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
    shuffleResourceTracker.addPartitionResource(
        jobID,
        shuffleResourceDescriptor.getShuffleId(),
        shuffleResourceDescriptor.getPartitionId(),
        resultPartitionID);

    RemoteShuffleResource remoteShuffleResource =
        new RemoteShuffleResource(
            lifecycleManager.getHost(),
            lifecycleManager.getPort(),
            lifecycleManagerTimestamp,
            shuffleResourceDescriptor);
    return new TierShuffleDescriptorImpl(
        celebornAppId,
        jobID,
        resultPartitionInfo.getShuffleId(),
        resultPartitionID,
        remoteShuffleResource);
  }

  @Override
  public void releasePartition(TierShuffleDescriptor shuffleDescriptor) {
    checkState(shuffleDescriptor instanceof TierShuffleDescriptorImpl, "Wrong descriptor type.");
    try {
      TierShuffleDescriptorImpl descriptor = (TierShuffleDescriptorImpl) shuffleDescriptor;
      RemoteShuffleResource shuffleResource = descriptor.getShuffleResource();
      ShuffleResourceDescriptor resourceDescriptor =
          shuffleResource.getMapPartitionShuffleDescriptor();
      LOG.debug("release partition resource: {}.", resourceDescriptor);
      lifecycleManager.releasePartition(
          resourceDescriptor.getShuffleId(), resourceDescriptor.getPartitionId());
      shuffleResourceTracker.removePartitionResource(
          descriptor.getJobId(),
          resourceDescriptor.getShuffleId(),
          resourceDescriptor.getPartitionId());
    } catch (Throwable throwable) {
      LOG.debug("Failed to release data partition {}.", shuffleDescriptor, throwable);
    }
  }

  @Override
  public void close() {
    try {
      jobShuffleIds.clear();
      if (null != lifecycleManager) {
        lifecycleManager.stop();
      }
    } catch (Exception e) {
      LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
    }

    ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor);
  }

  private JobShuffleContext getJobShuffleContext(
      JobID jobID, TierShuffleHandler tierShuffleHandler) {
    return new JobShuffleContext() {
      @Override
      public JobID getJobId() {
        return jobID;
      }

      @Override
      public CompletableFuture<?> stopTrackingAndReleasePartitions(
          Collection<ResultPartitionID> resultPartitionIds) {
        return tierShuffleHandler.onReleasePartitions(
            resultPartitionIds.stream()
                .map(TieredStorageIdMappingUtils::convertId)
                .collect(Collectors.toList()));
      }

      @Override
      public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
          Duration duration, Set<ResultPartitionID> set) {
        // TODO we should impl this when we support JM failover.
        return CompletableFuture.completedFuture(Collections.emptyList());
      }

      @Override
      public void notifyPartitionRecoveryStarted() {
        // TODO we should impl this when we support JM failover.
      }
    };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java [123:217]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
    if (shuffleIds == null) {
      throw new RuntimeException("Can not find job in master agent, job: " + jobID);
    }
    FlinkResultPartitionInfo resultPartitionInfo =
        new FlinkResultPartitionInfo(jobID, resultPartitionID);
    ShuffleResourceDescriptor shuffleResourceDescriptor =
        shuffleTaskInfo.genShuffleResourceDescriptor(
            resultPartitionInfo.getShuffleId(),
            resultPartitionInfo.getTaskId(),
            resultPartitionInfo.getAttemptId());
    shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
    shuffleResourceTracker.addPartitionResource(
        jobID,
        shuffleResourceDescriptor.getShuffleId(),
        shuffleResourceDescriptor.getPartitionId(),
        resultPartitionID);

    RemoteShuffleResource remoteShuffleResource =
        new RemoteShuffleResource(
            lifecycleManager.getHost(),
            lifecycleManager.getPort(),
            lifecycleManagerTimestamp,
            shuffleResourceDescriptor);
    return new TierShuffleDescriptorImpl(
        celebornAppId,
        jobID,
        resultPartitionInfo.getShuffleId(),
        resultPartitionID,
        remoteShuffleResource);
  }

  @Override
  public void releasePartition(TierShuffleDescriptor shuffleDescriptor) {
    checkState(shuffleDescriptor instanceof TierShuffleDescriptorImpl, "Wrong descriptor type.");
    try {
      TierShuffleDescriptorImpl descriptor = (TierShuffleDescriptorImpl) shuffleDescriptor;
      RemoteShuffleResource shuffleResource = descriptor.getShuffleResource();
      ShuffleResourceDescriptor resourceDescriptor =
          shuffleResource.getMapPartitionShuffleDescriptor();
      LOG.debug("release partition resource: {}.", resourceDescriptor);
      lifecycleManager.releasePartition(
          resourceDescriptor.getShuffleId(), resourceDescriptor.getPartitionId());
      shuffleResourceTracker.removePartitionResource(
          descriptor.getJobId(),
          resourceDescriptor.getShuffleId(),
          resourceDescriptor.getPartitionId());
    } catch (Throwable throwable) {
      LOG.debug("Failed to release data partition {}.", shuffleDescriptor, throwable);
    }
  }

  @Override
  public void close() {
    try {
      jobShuffleIds.clear();
      if (null != lifecycleManager) {
        lifecycleManager.stop();
      }
    } catch (Exception e) {
      LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
    }

    ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor);
  }

  private JobShuffleContext getJobShuffleContext(
      JobID jobID, TierShuffleHandler tierShuffleHandler) {
    return new JobShuffleContext() {
      @Override
      public JobID getJobId() {
        return jobID;
      }

      @Override
      public CompletableFuture<?> stopTrackingAndReleasePartitions(
          Collection<ResultPartitionID> resultPartitionIds) {
        return tierShuffleHandler.onReleasePartitions(
            resultPartitionIds.stream()
                .map(TieredStorageIdMappingUtils::convertId)
                .collect(Collectors.toList()));
      }

      @Override
      public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
          Duration duration, Set<ResultPartitionID> set) {
        // TODO we should impl this when we support JM failover.
        return CompletableFuture.completedFuture(Collections.emptyList());
      }

      @Override
      public void notifyPartitionRecoveryStarted() {
        // TODO we should impl this when we support JM failover.
      }
    };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



