in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java [68:161]
public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
TransportClient transportClient = getTransportClient();
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
request.getShuffleIdToBlocks();
boolean isSuccessful = true;
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
shuffleIdToBlocks.entrySet()) {
int shuffleId = stb.getKey();
int size = 0;
int blockNum = 0;
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
for (ShuffleBlockInfo sbi : ptb.getValue()) {
size += sbi.getSize();
blockNum++;
}
}
int allocateSize = size;
int finalBlockNum = blockNum;
try {
RetryUtils.retry(
() -> {
long requireId =
requirePreAllocation(
request.getAppId(),
allocateSize,
request.getRetryMax(),
request.getRetryIntervalMax());
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(
String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]",
allocateSize, host, port));
}
SendShuffleDataRequest sendShuffleDataRequest =
new SendShuffleDataRequest(
requestId(),
request.getAppId(),
shuffleId,
requireId,
stb.getValue(),
System.currentTimeMillis());
long start = System.currentTimeMillis();
RpcResponse rpcResponse =
transportClient.sendRpcSync(sendShuffleDataRequest, RPC_TIMEOUT_DEFAULT_MS);
LOG.debug(
"Do sendShuffleData to {}:{} rpc cost:"
+ (System.currentTimeMillis() - start)
+ " ms for "
+ allocateSize
+ " bytes with "
+ finalBlockNum
+ " blocks",
host,
port);
if (rpcResponse.getStatusCode() != StatusCode.SUCCESS) {
String msg =
"Can't send shuffle data with "
+ finalBlockNum
+ " blocks to "
+ host
+ ":"
+ port
+ ", statusCode="
+ rpcResponse.getStatusCode()
+ ", errorMsg:"
+ rpcResponse.getRetMessage();
if (rpcResponse.getStatusCode() == StatusCode.NO_REGISTER) {
throw new NotRetryException(msg);
} else {
throw new RssException(msg);
}
}
return rpcResponse;
},
request.getRetryIntervalMax(),
maxRetryAttempts);
} catch (Throwable throwable) {
LOG.warn(throwable.getMessage());
isSuccessful = false;
break;
}
}
RssSendShuffleDataResponse response;
if (isSuccessful) {
response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
} else {
response = new RssSendShuffleDataResponse(StatusCode.INTERNAL_ERROR);
}
return response;
}