client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [325:368]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void handshake() throws IOException {
    try {
      int remainingReviveTimes = maxReviveTimes;
      while (remainingReviveTimes-- > 0 && !hasSentHandshake) {
        // In the Flink hybrid shuffle integration strategy, the data buffer sent to the Celeborn
        // workers consists of two components: the Celeborn header and the data buffers.
        // In this scenario, the maximum byte size of the buffer received by the Celeborn worker is
        // equal to the sum of the Flink buffer size and the Celeborn header size.
        Optional<PartitionLocation> revivePartition =
            flinkShuffleClient.pushDataHandShake(
                shuffleId,
                mapId,
                attemptId,
                numSubPartitions,
                bufferSizeBytes + BufferUtils.HEADER_LENGTH,
                partitionLocation);
        // if remainingReviveTimes == 0 and revivePartition.isPresent(), there is no need to send
        // handshake again
        if (revivePartition.isPresent() && remainingReviveTimes > 0) {
          LOG.info(
              "Revive at handshake, currentTimes:{}, totalTimes:{} for shuffleId:{}, mapId:{}, "
                  + "attempId:{}, currentRegionIndex:{}, newPartition:{}, oldPartition:{}",
              remainingReviveTimes,
              maxReviveTimes,
              shuffleId,
              mapId,
              attemptId,
              currentRegionIndex,
              revivePartition,
              partitionLocation);
          partitionLocation = revivePartition.get();
          hasSentHandshake = false;
        } else {
          hasSentHandshake = true;
        }
      }
      if (remainingReviveTimes == 0 && !hasSentHandshake) {
        throw new RuntimeException(
            "After retry " + maxReviveTimes + " times, still failed to send handshake");
      }
    } catch (IOException e) {
      Utils.rethrowAsRuntimeException(e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java [325:368]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void handshake() throws IOException {
    try {
      int remainingReviveTimes = maxReviveTimes;
      while (remainingReviveTimes-- > 0 && !hasSentHandshake) {
        // In the Flink hybrid shuffle integration strategy, the data buffer sent to the Celeborn
        // workers consists of two components: the Celeborn header and the data buffers.
        // In this scenario, the maximum byte size of the buffer received by the Celeborn worker is
        // equal to the sum of the Flink buffer size and the Celeborn header size.
        Optional<PartitionLocation> revivePartition =
            flinkShuffleClient.pushDataHandShake(
                shuffleId,
                mapId,
                attemptId,
                numSubPartitions,
                bufferSizeBytes + BufferUtils.HEADER_LENGTH,
                partitionLocation);
        // if remainingReviveTimes == 0 and revivePartition.isPresent(), there is no need to send
        // handshake again
        if (revivePartition.isPresent() && remainingReviveTimes > 0) {
          LOG.info(
              "Revive at handshake, currentTimes:{}, totalTimes:{} for shuffleId:{}, mapId:{}, "
                  + "attempId:{}, currentRegionIndex:{}, newPartition:{}, oldPartition:{}",
              remainingReviveTimes,
              maxReviveTimes,
              shuffleId,
              mapId,
              attemptId,
              currentRegionIndex,
              revivePartition,
              partitionLocation);
          partitionLocation = revivePartition.get();
          hasSentHandshake = false;
        } else {
          hasSentHandshake = true;
        }
      }
      if (remainingReviveTimes == 0 && !hasSentHandshake) {
        throw new RuntimeException(
            "After retry " + maxReviveTimes + " times, still failed to send handshake");
      }
    } catch (IOException e) {
      Utils.rethrowAsRuntimeException(e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



