in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java [355:469]
public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
String appId = request.getAppId();
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
request.getShuffleIdToBlocks();
boolean isSuccessful = true;
// prepare rpc request based on shuffleId -> partitionId -> blocks
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
shuffleIdToBlocks.entrySet()) {
List<ShuffleData> shuffleData = Lists.newArrayList();
int size = 0;
int blockNum = 0;
int shuffleId = stb.getKey();
List<Integer> partitionIds = new ArrayList<>();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
for (ShuffleBlockInfo sbi : ptb.getValue()) {
shuffleBlocks.add(
ShuffleBlock.newBuilder()
.setBlockId(sbi.getBlockId())
.setCrc(sbi.getCrc())
.setLength(sbi.getLength())
.setTaskAttemptId(sbi.getTaskAttemptId())
.setUncompressLength(sbi.getUncompressLength())
.setData(UnsafeByteOperations.unsafeWrap(sbi.getData().nioBuffer()))
.build());
size += sbi.getSize();
blockNum++;
}
shuffleData.add(
ShuffleData.newBuilder()
.setPartitionId(ptb.getKey())
.addAllBlock(shuffleBlocks)
.build());
partitionIds.add(ptb.getKey());
}
final int allocateSize = size;
final int finalBlockNum = blockNum;
try {
RetryUtils.retry(
() -> {
long requireId =
requirePreAllocation(
appId,
shuffleId,
partitionIds,
allocateSize,
request.getRetryMax() / maxRetryAttempts,
request.getRetryIntervalMax());
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(
String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]",
allocateSize, host, port));
}
long start = System.currentTimeMillis();
SendShuffleDataRequest rpcRequest =
SendShuffleDataRequest.newBuilder()
.setAppId(appId)
.setShuffleId(stb.getKey())
.setRequireBufferId(requireId)
.addAllShuffleData(shuffleData)
.setTimestamp(start)
.build();
SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
LOG.debug(
"Do sendShuffleData to {}:{} rpc cost:"
+ (System.currentTimeMillis() - start)
+ " ms for "
+ allocateSize
+ " bytes with "
+ finalBlockNum
+ " blocks",
host,
port);
if (response.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg =
"Can't send shuffle data with "
+ finalBlockNum
+ " blocks to "
+ host
+ ":"
+ port
+ ", statusCode="
+ response.getStatus()
+ ", errorMsg:"
+ response.getRetMsg();
if (response.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
throw new NotRetryException(msg);
} else {
throw new RssException(msg);
}
}
return response;
},
request.getRetryIntervalMax(),
maxRetryAttempts);
} catch (Throwable throwable) {
LOG.warn(throwable.getMessage());
isSuccessful = false;
break;
}
}
RssSendShuffleDataResponse response;
if (isSuccessful) {
response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
} else {
response = new RssSendShuffleDataResponse(StatusCode.INTERNAL_ERROR);
}
return response;
}