in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [442:525]
public boolean sendCommit(
Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
ForkJoinPool forkJoinPool =
new ForkJoinPool(
dataCommitPoolSize == -1 ? shuffleServerInfoSet.size() : dataCommitPoolSize);
AtomicInteger successfulCommit = new AtomicInteger(0);
try {
forkJoinPool
.submit(
() -> {
shuffleServerInfoSet
.parallelStream()
.forEach(
ssi -> {
RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
String errorMsg =
"Failed to commit shuffle data to "
+ ssi
+ " for shuffleId["
+ shuffleId
+ "]";
long startTime = System.currentTimeMillis();
try {
RssSendCommitResponse response =
getShuffleServerClient(ssi).sendCommit(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
int commitCount = response.getCommitCount();
LOG.info(
"Successfully sendCommit for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "] to ShuffleServer["
+ ssi.getId()
+ "], cost "
+ (System.currentTimeMillis() - startTime)
+ " ms, got committed maps["
+ commitCount
+ "], map number of stage is "
+ numMaps);
if (commitCount >= numMaps) {
RssFinishShuffleResponse rfsResponse =
getShuffleServerClient(ssi)
.finishShuffle(
new RssFinishShuffleRequest(appId, shuffleId));
if (rfsResponse.getStatusCode() != StatusCode.SUCCESS) {
String msg =
"Failed to finish shuffle to "
+ ssi
+ " for shuffleId["
+ shuffleId
+ "] with statusCode "
+ rfsResponse.getStatusCode();
LOG.error(msg);
throw new Exception(msg);
} else {
LOG.info(
"Successfully finish shuffle to "
+ ssi
+ " for shuffleId["
+ shuffleId
+ "]");
}
}
} else {
String msg =
errorMsg + " with statusCode " + response.getStatusCode();
LOG.error(msg);
throw new Exception(msg);
}
successfulCommit.incrementAndGet();
} catch (Exception e) {
LOG.error(errorMsg, e);
}
});
})
.join();
} finally {
forkJoinPool.shutdownNow();
}
// check if every commit/finish call is successful
return successfulCommit.get() == shuffleServerInfoSet.size();
}