in client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java [346:423]
public Optional<PartitionLocation> regionStart(
int shuffleId,
int mapId,
int attemptId,
PartitionLocation location,
int currentRegionIdx,
boolean isBroadcast)
throws IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
final PushState pushState = pushStates.computeIfAbsent(mapKey, (s) -> new PushState(conf));
return sendMessageInternal(
shuffleId,
mapId,
attemptId,
location,
pushState,
() -> {
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
logger.info(
"RegionStart for shuffle {} regionId {} attemptId {} locationId {}.",
shuffleId,
currentRegionIdx,
attemptId,
location.getUniqueId());
logger.debug("RegionStart for location {}.", location.toString());
TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
RegionStart regionStart =
new RegionStart(
PRIMARY_MODE,
shuffleKey,
location.getUniqueId(),
attemptId,
currentRegionIdx,
isBroadcast);
ByteBuffer regionStartResponse =
client.sendRpcSync(regionStart.toByteBuffer(), conf.pushDataTimeoutMs());
if (regionStartResponse.hasRemaining()
&& regionStartResponse.get() == StatusCode.HARD_SPLIT.getValue()) {
// if split then revive
Set<Integer> mapIds = new HashSet<>();
mapIds.add(mapId);
List<ReviveRequest> requests = new ArrayList<>();
ReviveRequest req =
new ReviveRequest(
shuffleId,
mapId,
attemptId,
location.getId(),
location.getEpoch(),
location,
StatusCode.HARD_SPLIT);
requests.add(req);
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
// per partitionKey only serve single PartitionLocation in Client Cache.
PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(0);
StatusCode respStatus = Utils.toStatusCode(partitionInfo.getStatus());
if (StatusCode.SUCCESS.equals(respStatus)) {
return Optional.of(
PbSerDeUtils.fromPbPartitionLocation(partitionInfo.getPartition()));
} else {
// throw exception
logger.error(
"Exception raised while reviving for shuffle {} map {} attemptId {} partition {} epoch {}.",
shuffleId,
mapId,
attemptId,
location.getId(),
location.getEpoch());
throw new CelebornIOException("RegionStart revive failed");
}
}
return Optional.empty();
});
}