public Optional regionStart()

in client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java [346:423]


  public Optional<PartitionLocation> regionStart(
      int shuffleId,
      int mapId,
      int attemptId,
      PartitionLocation location,
      int currentRegionIdx,
      boolean isBroadcast)
      throws IOException {
    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
    final PushState pushState = pushStates.computeIfAbsent(mapKey, (s) -> new PushState(conf));
    return sendMessageInternal(
        shuffleId,
        mapId,
        attemptId,
        location,
        pushState,
        () -> {
          String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
          logger.info(
              "RegionStart for shuffle {} regionId {} attemptId {} locationId {}.",
              shuffleId,
              currentRegionIdx,
              attemptId,
              location.getUniqueId());
          logger.debug("RegionStart  for location {}.", location.toString());
          TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
          RegionStart regionStart =
              new RegionStart(
                  PRIMARY_MODE,
                  shuffleKey,
                  location.getUniqueId(),
                  attemptId,
                  currentRegionIdx,
                  isBroadcast);
          ByteBuffer regionStartResponse =
              client.sendRpcSync(regionStart.toByteBuffer(), conf.pushDataTimeoutMs());
          if (regionStartResponse.hasRemaining()
              && regionStartResponse.get() == StatusCode.HARD_SPLIT.getValue()) {
            // if split then revive
            Set<Integer> mapIds = new HashSet<>();
            mapIds.add(mapId);
            List<ReviveRequest> requests = new ArrayList<>();
            ReviveRequest req =
                new ReviveRequest(
                    shuffleId,
                    mapId,
                    attemptId,
                    location.getId(),
                    location.getEpoch(),
                    location,
                    StatusCode.HARD_SPLIT);
            requests.add(req);
            PbChangeLocationResponse response =
                lifecycleManagerRef.askSync(
                    ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
                    conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
                    ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
            // per partitionKey only serve single PartitionLocation in Client Cache.
            PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(0);
            StatusCode respStatus = Utils.toStatusCode(partitionInfo.getStatus());
            if (StatusCode.SUCCESS.equals(respStatus)) {
              return Optional.of(
                  PbSerDeUtils.fromPbPartitionLocation(partitionInfo.getPartition()));
            } else {
              // throw exception
              logger.error(
                  "Exception raised while reviving for shuffle {} map {} attemptId {} partition {} epoch {}.",
                  shuffleId,
                  mapId,
                  attemptId,
                  location.getId(),
                  location.getEpoch());
              throw new CelebornIOException("RegionStart revive failed");
            }
          }
          return Optional.empty();
        });
  }