in client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java [67:159]
public void reportShuffleWriteFailure(
RssProtos.ReportShuffleWriteFailureRequest request,
StreamObserver<RssProtos.ReportShuffleWriteFailureResponse> responseObserver) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int stageAttemptId = request.getStageAttemptId();
int stageAttemptNumber = request.getStageAttemptNumber();
List<RssProtos.ShuffleServerId> shuffleServerIdsList = request.getShuffleServerIdsList();
RssProtos.StatusCode code;
boolean reSubmitWholeStage;
String msg;
if (!appId.equals(shuffleManager.getAppId())) {
msg =
String.format(
"got a wrong shuffle write failure report from appId: %s, expected appId: %s",
appId, shuffleManager.getAppId());
LOG.warn(msg);
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
Map<String, AtomicInteger> initServerFailures = JavaUtils.newConcurrentMap();
List<ShuffleServerInfo> shuffleServerInfos =
ShuffleServerInfo.fromProto(shuffleServerIdsList);
shuffleServerInfos.forEach(
shuffleServerInfo ->
initServerFailures.computeIfAbsent(
shuffleServerInfo.getId(), key -> new AtomicInteger(0)));
ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
shuffleWriteStatus.computeIfAbsent(
shuffleId,
key -> new ShuffleServerWriterFailureRecord(stageAttemptNumber, initServerFailures));
boolean resetflag =
shuffleServerWriterFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
if (resetflag) {
msg =
String.format(
"got an old stage(%d_%d) shuffle write failure report, which should be impossible.",
stageAttemptId, stageAttemptNumber);
LOG.warn(msg);
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
synchronized (shuffleServerWriterFailureRecord) {
code = RssProtos.StatusCode.SUCCESS;
// update the stage shuffleServer write failed count
boolean isFetchFailed =
shuffleServerWriterFailureRecord.incWriteFailureForShuffleServer(
stageAttemptNumber, shuffleServerInfos, shuffleManager);
if (isFetchFailed) {
reSubmitWholeStage = true;
msg =
String.format(
"Report shuffle write failure as maximum number(%d) of shuffle write is occurred.",
shuffleManager.getMaxFetchFailures());
if (!shuffleServerWriterFailureRecord.isClearedMapTrackerBlock()) {
try {
// Clear the metadata of the completed task, otherwise some of the stage's data will
// be lost.
shuffleManager.unregisterAllMapOutput(shuffleId);
// Deregister the shuffleId corresponding to the Shuffle Server.
shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);
LOG.info(
"Clear shuffle result in shuffleId:{}, stageId:{}, stageAttemptNumber:{} in the write failure phase.",
shuffleId,
stageAttemptId,
stageAttemptNumber);
} catch (SparkException e) {
LOG.error(
"Clear MapoutTracker Meta failed in shuffleId:{}, stageAttemptId:{}, stageAttemptNumber:{} in the write failure phase.",
shuffleId,
stageAttemptId,
stageAttemptNumber);
throw new RssException("Clear MapoutTracker Meta failed!", e);
}
}
} else {
reSubmitWholeStage = false;
msg = "The maximum number of failures was not reached.";
}
}
}
}
RssProtos.ReportShuffleWriteFailureResponse reply =
RssProtos.ReportShuffleWriteFailureResponse.newBuilder()
.setStatus(code)
.setReSubmitWholeStage(reSubmitWholeStage)
.setMsg(msg)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}