in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java [540:672]
public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
String appId = request.getAppId();
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
request.getShuffleIdToBlocks();
int stageAttemptNumber = request.getStageAttemptNumber();
boolean isSuccessful = true;
AtomicReference<StatusCode> failedStatusCode = new AtomicReference<>(StatusCode.INTERNAL_ERROR);
Set<Integer> needSplitPartitionIds = Sets.newHashSet();
// prepare rpc request based on shuffleId -> partitionId -> blocks
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
shuffleIdToBlocks.entrySet()) {
List<ShuffleData> shuffleData = Lists.newArrayList();
int size = 0;
int blockNum = 0;
int shuffleId = stb.getKey();
List<Integer> partitionIds = new ArrayList<>();
List<Integer> partitionRequireSizes = new ArrayList<>();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
int partitionRequireSize = 0;
for (ShuffleBlockInfo sbi : ptb.getValue()) {
shuffleBlocks.add(
ShuffleBlock.newBuilder()
.setBlockId(sbi.getBlockId())
.setCrc(sbi.getCrc())
.setLength(sbi.getLength())
.setTaskAttemptId(sbi.getTaskAttemptId())
.setUncompressLength(sbi.getUncompressLength())
.setData(UnsafeByteOperations.unsafeWrap(sbi.getData().nioBuffer()))
.build());
partitionRequireSize += sbi.getSize();
blockNum++;
}
size += partitionRequireSize;
shuffleData.add(
ShuffleData.newBuilder()
.setPartitionId(ptb.getKey())
.addAllBlock(shuffleBlocks)
.build());
partitionIds.add(ptb.getKey());
partitionRequireSizes.add(partitionRequireSize);
}
final int allocateSize = size;
final int finalBlockNum = blockNum;
try {
RetryUtils.retryWithCondition(
() -> {
// TODO(baoloongmao): support partition split follow netty client
Pair<Long, List<Integer>> allocationResult =
requirePreAllocation(
appId,
shuffleId,
partitionIds,
partitionRequireSizes,
allocateSize,
request.getRetryMax() / maxRetryAttempts,
request.getRetryIntervalMax(),
failedStatusCode);
long requireId = allocationResult.getLeft();
needSplitPartitionIds.addAll(allocationResult.getRight());
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(
String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]",
allocateSize, host, port));
}
long start = System.currentTimeMillis();
SendShuffleDataRequest rpcRequest =
SendShuffleDataRequest.newBuilder()
.setAppId(appId)
.setShuffleId(stb.getKey())
.setRequireBufferId(requireId)
.addAllShuffleData(shuffleData)
.setTimestamp(start)
.setStageAttemptNumber(stageAttemptNumber)
.build();
SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Do sendShuffleData to {}:{} rpc cost:"
+ (System.currentTimeMillis() - start)
+ " ms for "
+ allocateSize
+ " bytes with "
+ finalBlockNum
+ " blocks",
host,
port);
}
if (response.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg =
"Can't send shuffle data with "
+ finalBlockNum
+ " blocks to "
+ host
+ ":"
+ port
+ ", statusCode="
+ response.getStatus()
+ ", errorMsg:"
+ response.getRetMsg();
failedStatusCode.set(StatusCode.fromCode(response.getStatus().getNumber()));
if (NOT_RETRY_STATUS_CODES.contains(failedStatusCode.get())) {
throw new NotRetryException(msg);
} else {
throw new RssException(msg);
}
}
return response;
},
null,
request.getRetryIntervalMax(),
maxRetryAttempts,
t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
} catch (Throwable throwable) {
LOG.warn("Failed to send shuffle data due to ", throwable);
isSuccessful = false;
break;
}
}
RssSendShuffleDataResponse response;
if (isSuccessful) {
response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
} else {
response = new RssSendShuffleDataResponse(failedStatusCode.get());
}
response.setNeedSplitPartitionIds(needSplitPartitionIds);
return response;
}