in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java [136:251]
public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
request.getShuffleIdToBlocks();
int stageAttemptNumber = request.getStageAttemptNumber();
boolean isSuccessful = true;
AtomicReference<StatusCode> failedStatusCode = new AtomicReference<>(StatusCode.INTERNAL_ERROR);
Set<Integer> needSplitPartitionIds = new HashSet<>();
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
shuffleIdToBlocks.entrySet()) {
int shuffleId = stb.getKey();
int size = 0;
int blockNum = 0;
List<Integer> partitionIds = new ArrayList<>();
List<Integer> partitionRequireSizes = new ArrayList<>();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
int partitionRequireSize = 0;
for (ShuffleBlockInfo sbi : ptb.getValue()) {
partitionRequireSize += sbi.getSize();
blockNum++;
}
size += partitionRequireSize;
partitionIds.add(ptb.getKey());
partitionRequireSizes.add(partitionRequireSize);
}
SendShuffleDataRequest sendShuffleDataRequest =
new SendShuffleDataRequest(
requestId(),
request.getAppId(),
shuffleId,
stageAttemptNumber,
0L,
stb.getValue(),
System.currentTimeMillis());
int allocateSize = size + sendShuffleDataRequest.encodedLength();
int finalBlockNum = blockNum;
try {
RetryUtils.retryWithCondition(
() -> {
final TransportClient transportClient = getTransportClient();
Pair<Long, List<Integer>> result =
requirePreAllocation(
request.getAppId(),
shuffleId,
partitionIds,
partitionRequireSizes,
allocateSize,
request.getRetryMax(),
request.getRetryIntervalMax(),
failedStatusCode);
long requireId = result.getLeft();
needSplitPartitionIds.addAll(result.getRight());
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(
String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]",
allocateSize, host, port));
}
sendShuffleDataRequest.setRequireId(requireId);
sendShuffleDataRequest.setTimestamp(System.currentTimeMillis());
long start = System.currentTimeMillis();
RpcResponse rpcResponse =
transportClient.sendRpcSync(sendShuffleDataRequest, rpcTimeout);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Do sendShuffleData to {}:{} rpc cost:"
+ (System.currentTimeMillis() - start)
+ " ms for "
+ allocateSize
+ " bytes with "
+ finalBlockNum
+ " blocks",
host,
port);
}
if (rpcResponse.getStatusCode() != StatusCode.SUCCESS) {
failedStatusCode.set(StatusCode.fromCode(rpcResponse.getStatusCode().statusCode()));
String msg =
"Can't send shuffle data with "
+ finalBlockNum
+ " blocks to "
+ host
+ ":"
+ port
+ ", statusCode="
+ rpcResponse.getStatusCode()
+ ", errorMsg:"
+ rpcResponse.getRetMessage();
if (NOT_RETRY_STATUS_CODES.contains(rpcResponse.getStatusCode())) {
throw new NotRetryException(msg);
} else {
throw new RssException(msg);
}
}
return rpcResponse;
},
null,
request.getRetryIntervalMax(),
maxRetryAttempts,
t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
} catch (Throwable throwable) {
LOG.warn("Failed to send shuffle data due to ", throwable);
isSuccessful = false;
break;
}
}
RssSendShuffleDataResponse response;
if (isSuccessful) {
response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
} else {
response = new RssSendShuffleDataResponse(failedStatusCode.get());
}
response.setNeedSplitPartitionIds(needSplitPartitionIds);
return response;
}