in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [167:283]
private boolean sendShuffleDataAsync(
String appId,
int stageAttemptNumber,
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> serverToBlocks,
Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
Map<Long, AtomicInteger> blockIdsSendSuccessTracker,
FailedBlockSendTracker failedBlockSendTracker,
boolean allowFastFail,
Supplier<Boolean> needCancelRequest,
ShuffleServerPushCostTracker shuffleServerPushCostTracker) {
if (serverToBlockIds == null) {
return true;
}
// If one or more servers is failed, the sending is not totally successful.
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (Map.Entry<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> entry :
serverToBlocks.entrySet()) {
CompletableFuture<Boolean> future =
CompletableFuture.supplyAsync(
() -> {
if (needCancelRequest.get()) {
LOG.info("The upstream task has been failed. Abort this data send.");
return true;
}
ShuffleServerInfo ssi = entry.getKey();
try {
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
entry.getValue();
// todo: compact unnecessary blocks that reach replicaWrite
RssSendShuffleDataRequest request =
new RssSendShuffleDataRequest(
appId,
stageAttemptNumber,
retryMax,
retryIntervalMax,
shuffleIdToBlocks);
long s = System.currentTimeMillis();
RssSendShuffleDataResponse response =
getShuffleServerClient(ssi).sendShuffleData(request);
long pushDuration = System.currentTimeMillis() - s;
String logMsg =
String.format(
"ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)",
serverToBlockIds.get(ssi).size(), ssi.getId(), pushDuration);
if (response.getStatusCode() == StatusCode.SUCCESS) {
// mark a replica of block that has been sent
serverToBlockIds
.get(ssi)
.forEach(
blockId ->
blockIdsSendSuccessTracker.get(blockId).incrementAndGet());
recordNeedSplitPartition(
failedBlockSendTracker, ssi, response.getNeedSplitPartitionIds());
if (defectiveServers != null) {
defectiveServers.remove(ssi);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} successfully.", logMsg);
}
} else {
recordFailedBlocks(
failedBlockSendTracker, serverToBlocks, ssi, response.getStatusCode());
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
LOG.warn(
"{}, it failed wth statusCode[{}]", logMsg, response.getStatusCode());
return false;
}
// record shuffle-server push cost
long sentBytes =
shuffleIdToBlocks.values().stream()
.flatMap(x -> x.values().stream())
.flatMap(x -> x.stream())
.map(x -> x.getLength())
.reduce((a, b) -> a + b)
.orElse(0);
shuffleServerPushCostTracker.record(ssi.getId(), sentBytes, pushDuration);
} catch (Exception e) {
recordFailedBlocks(
failedBlockSendTracker, serverToBlocks, ssi, StatusCode.INTERNAL_ERROR);
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
LOG.warn(
"Send: "
+ serverToBlockIds.get(ssi).size()
+ " blocks to ["
+ ssi.getId()
+ "] failed.",
e);
return false;
}
return true;
},
dataTransferPool)
.exceptionally(
ex -> {
LOG.error("Unexpected exceptions occurred while sending shuffle data", ex);
return false;
});
futures.add(future);
}
boolean result = ClientUtils.waitUntilDoneOrFail(futures, allowFastFail);
if (!result) {
LOG.error(
"Some shuffle data can't be sent to shuffle-server, is fast fail: {}, cancelled task size: {}",
allowFastFail,
futures.size());
}
return result;
}