client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [270:312]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void regionStartWithRevive() {
    try {
      int remainingReviveTimes = maxReviveTimes;
      while (remainingReviveTimes-- > 0 && !hasSentRegionStart) {
        Optional<PartitionLocation> revivePartition =
            flinkShuffleClient.regionStart(
                shuffleId, mapId, attemptId, partitionLocation, currentRegionIndex, false);
        if (revivePartition.isPresent()) {
          LOG.info(
              "Revive at regionStart, currentTimes:{}, totalTimes:{} for shuffleId:{}, mapId:{}, "
                  + "attempId:{}, currentRegionIndex:{}, isBroadcast:{}, newPartition:{}, oldPartition:{}",
              remainingReviveTimes,
              maxReviveTimes,
              shuffleId,
              mapId,
              attemptId,
              currentRegionIndex,
              false,
              revivePartition,
              partitionLocation);
          partitionLocation = revivePartition.get();
          // For every revive partition, handshake should be sent firstly
          hasSentHandshake = false;
          handshake();
          if (numSubPartitions > 0) {
            for (int i = 0; i < numSubPartitions; i++) {
              flinkShuffleClient.segmentStart(
                  shuffleId, mapId, attemptId, i, subPartitionSegmentIds[i], partitionLocation);
            }
          }
        } else {
          hasSentRegionStart = true;
          currentSubpartition = 0;
        }
      }
      if (remainingReviveTimes == 0 && !hasSentRegionStart) {
        throw new RuntimeException(
            "After retry " + maxReviveTimes + " times, still failed to send regionStart");
      }
    } catch (IOException e) {
      Utils.rethrowAsRuntimeException(e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [270:312]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void regionStartWithRevive() {
    try {
      int remainingReviveTimes = maxReviveTimes;
      while (remainingReviveTimes-- > 0 && !hasSentRegionStart) {
        Optional<PartitionLocation> revivePartition =
            flinkShuffleClient.regionStart(
                shuffleId, mapId, attemptId, partitionLocation, currentRegionIndex, false);
        if (revivePartition.isPresent()) {
          LOG.info(
              "Revive at regionStart, currentTimes:{}, totalTimes:{} for shuffleId:{}, mapId:{}, "
                  + "attempId:{}, currentRegionIndex:{}, isBroadcast:{}, newPartition:{}, oldPartition:{}",
              remainingReviveTimes,
              maxReviveTimes,
              shuffleId,
              mapId,
              attemptId,
              currentRegionIndex,
              false,
              revivePartition,
              partitionLocation);
          partitionLocation = revivePartition.get();
          // For every revive partition, handshake should be sent firstly
          hasSentHandshake = false;
          handshake();
          if (numSubPartitions > 0) {
            for (int i = 0; i < numSubPartitions; i++) {
              flinkShuffleClient.segmentStart(
                  shuffleId, mapId, attemptId, i, subPartitionSegmentIds[i], partitionLocation);
            }
          }
        } else {
          hasSentRegionStart = true;
          currentSubpartition = 0;
        }
      }
      if (remainingReviveTimes == 0 && !hasSentRegionStart) {
        throw new RuntimeException(
            "After retry " + maxReviveTimes + " times, still failed to send regionStart");
      }
    } catch (IOException e) {
      Utils.rethrowAsRuntimeException(e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



