in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [247:405]
public void registerShuffle(
ShuffleRegisterRequest req, StreamObserver<ShuffleRegisterResponse> responseObserver) {
try (ServerRpcAuditContext auditContext = createAuditContext("registerShuffle")) {
ShuffleRegisterResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int stageAttemptNumber = req.getStageAttemptNumber();
String remoteStoragePath = req.getRemoteStorage().getPath();
String user = req.getUser();
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"remoteStoragePath="
+ remoteStoragePath
+ ", user="
+ user
+ ", stageAttemptNumber="
+ stageAttemptNumber);
// If the Stage is registered for the first time, you do not need to consider the Stage retry
// and delete the Block data that has been sent.
ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
if (taskInfo != null) {
// Prevents AttemptNumber of multiple stages from modifying the latest AttemptNumber.
synchronized (taskInfo) {
int lastAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
if (stageAttemptNumber > 0 && stageAttemptNumber > lastAttemptNumber) {
taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
try {
long start = System.currentTimeMillis();
shuffleServer
.getShuffleTaskManager()
.removeShuffleDataSyncRenameAndDelete(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage recomputing for app: {}, "
+ "shuffleId: {}, stageAttemptNumber: {}. It costs {} ms",
appId,
shuffleId,
lastAttemptNumber,
System.currentTimeMillis() - start);
// Add a check to prevent undeleted metadata.
ShuffleBlockIdManager shuffleBlockIdManager =
shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.getShuffleBlockIdManager();
long blockCountByShuffleId =
shuffleBlockIdManager.getBlockCountByShuffleId(
appId, Lists.newArrayList(shuffleId));
if (blockCountByShuffleId != 0) {
LOG.error(
"Metadata is not deleted on clearing previous stage attempt data for app: {}, shuffleId: {}, stageAttemptNumber: {}",
appId,
shuffleId,
lastAttemptNumber);
StatusCode code = StatusCode.INTERNAL_ERROR;
auditContext.withStatusCode(code);
reply = ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
} catch (Exception e) {
LOG.error(
"Errors on clearing previous stage attempt data for app: {}, shuffleId: {}, stageAttemptNumber: {}",
appId,
shuffleId,
lastAttemptNumber,
e);
StatusCode code = StatusCode.INTERNAL_ERROR;
auditContext.withStatusCode(code);
reply = ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
} else if (stageAttemptNumber > 0 && stageAttemptNumber <= lastAttemptNumber) {
LOG.info(
"The registration failed. The latest retry count is smaller than the existing retry count. This situation should not exist.");
// When a Stage retry occurs, the first or last registration of a Stage may need to be
// ignored and the ignored status quickly returned.
StatusCode code = StatusCode.STAGE_RETRY_IGNORE;
auditContext.withStatusCode(code);
reply = ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
}
}
ShuffleDataDistributionType shuffleDataDistributionType =
ShuffleDataDistributionType.valueOf(
Optional.ofNullable(req.getShuffleDataDistribution())
.orElse(RssProtos.DataDistribution.NORMAL)
.name());
int maxConcurrencyPerPartitionToWrite = req.getMaxConcurrencyPerPartitionToWrite();
Map<String, String> remoteStorageConf =
req.getRemoteStorage().getRemoteStorageConfList().stream()
.collect(
Collectors.toMap(RemoteStorageConfItem::getKey, RemoteStorageConfItem::getValue));
List<PartitionRange> partitionRanges = toPartitionRanges(req.getPartitionRangesList());
LOG.info(
"Get register request for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], remoteStorage["
+ remoteStoragePath
+ "] with "
+ partitionRanges.size()
+ " partition ranges. User: {}",
user);
StatusCode result =
shuffleServer
.getShuffleTaskManager()
.registerShuffle(
appId,
shuffleId,
stageAttemptNumber,
partitionRanges,
new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
user,
shuffleDataDistributionType,
maxConcurrencyPerPartitionToWrite,
req.getPropertiesMap());
if (StatusCode.SUCCESS == result
&& shuffleServer.isRemoteMergeEnable()
&& req.hasMergeContext()) {
// The merged block is in a different domain from the original block,
// so you need to register a new app for holding the merged block.
result =
shuffleServer
.getShuffleTaskManager()
.registerShuffle(
appId + MERGE_APP_SUFFIX,
shuffleId,
stageAttemptNumber,
partitionRanges,
new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
user,
shuffleDataDistributionType,
maxConcurrencyPerPartitionToWrite,
req.getPropertiesMap());
if (result == StatusCode.SUCCESS) {
result =
shuffleServer
.getShuffleMergeManager()
.registerShuffle(appId, shuffleId, req.getMergeContext());
}
}
auditContext.withStatusCode(result);
reply = ShuffleRegisterResponse.newBuilder().setStatus(result.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}