in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [181:266]
private boolean sendShuffleDataAsync(
String appId,
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> serverToBlocks,
Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
Map<Long, AtomicInteger> blockIdsTracker,
boolean allowFastFail,
Supplier<Boolean> needCancelRequest) {
if (serverToBlockIds == null) {
return true;
}
// If one or more servers is failed, the sending is not totally successful.
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (Map.Entry<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> entry :
serverToBlocks.entrySet()) {
CompletableFuture<Boolean> future =
CompletableFuture.supplyAsync(
() -> {
if (needCancelRequest.get()) {
LOG.info("The upstream task has been failed. Abort this data send.");
return true;
}
ShuffleServerInfo ssi = entry.getKey();
try {
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
entry.getValue();
// todo: compact unnecessary blocks that reach replicaWrite
RssSendShuffleDataRequest request =
new RssSendShuffleDataRequest(
appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
long s = System.currentTimeMillis();
RssSendShuffleDataResponse response =
getShuffleServerClient(ssi).sendShuffleData(request);
String logMsg =
String.format(
"ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)",
serverToBlockIds.get(ssi).size(),
ssi.getId(),
System.currentTimeMillis() - s);
if (response.getStatusCode() == StatusCode.SUCCESS) {
// mark a replica of block that has been sent
serverToBlockIds
.get(ssi)
.forEach(block -> blockIdsTracker.get(block).incrementAndGet());
if (defectiveServers != null) {
defectiveServers.remove(ssi);
}
LOG.debug("{} successfully.", logMsg);
} else {
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
LOG.warn("{}, it failed wth statusCode[{}]", logMsg, response.getStatusCode());
return false;
}
} catch (Exception e) {
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
LOG.warn(
"Send: "
+ serverToBlockIds.get(ssi).size()
+ " blocks to ["
+ ssi.getId()
+ "] failed.",
e);
return false;
}
return true;
},
dataTransferPool);
futures.add(future);
}
boolean result = ClientUtils.waitUntilDoneOrFail(futures, allowFastFail);
if (!result) {
LOG.error(
"Some shuffle data can't be sent to shuffle-server, is fast fail: {}, cancelled task size: {}",
allowFastFail,
futures.size());
}
return result;
}