public RssSendShuffleDataResponse sendShuffleData()

in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java [540:672]


  public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
    String appId = request.getAppId();
    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
        request.getShuffleIdToBlocks();
    int stageAttemptNumber = request.getStageAttemptNumber();

    boolean isSuccessful = true;
    AtomicReference<StatusCode> failedStatusCode = new AtomicReference<>(StatusCode.INTERNAL_ERROR);
    Set<Integer> needSplitPartitionIds = Sets.newHashSet();
    // prepare rpc request based on shuffleId -> partitionId -> blocks
    for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
        shuffleIdToBlocks.entrySet()) {
      List<ShuffleData> shuffleData = Lists.newArrayList();
      int size = 0;
      int blockNum = 0;
      int shuffleId = stb.getKey();
      List<Integer> partitionIds = new ArrayList<>();
      List<Integer> partitionRequireSizes = new ArrayList<>();

      for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
        List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
        int partitionRequireSize = 0;
        for (ShuffleBlockInfo sbi : ptb.getValue()) {
          shuffleBlocks.add(
              ShuffleBlock.newBuilder()
                  .setBlockId(sbi.getBlockId())
                  .setCrc(sbi.getCrc())
                  .setLength(sbi.getLength())
                  .setTaskAttemptId(sbi.getTaskAttemptId())
                  .setUncompressLength(sbi.getUncompressLength())
                  .setData(UnsafeByteOperations.unsafeWrap(sbi.getData().nioBuffer()))
                  .build());
          partitionRequireSize += sbi.getSize();
          blockNum++;
        }
        size += partitionRequireSize;
        shuffleData.add(
            ShuffleData.newBuilder()
                .setPartitionId(ptb.getKey())
                .addAllBlock(shuffleBlocks)
                .build());
        partitionIds.add(ptb.getKey());
        partitionRequireSizes.add(partitionRequireSize);
      }

      final int allocateSize = size;
      final int finalBlockNum = blockNum;
      try {
        RetryUtils.retryWithCondition(
            () -> {
              // TODO(baoloongmao): support partition split follow netty client
              Pair<Long, List<Integer>> allocationResult =
                  requirePreAllocation(
                      appId,
                      shuffleId,
                      partitionIds,
                      partitionRequireSizes,
                      allocateSize,
                      request.getRetryMax() / maxRetryAttempts,
                      request.getRetryIntervalMax(),
                      failedStatusCode);
              long requireId = allocationResult.getLeft();
              needSplitPartitionIds.addAll(allocationResult.getRight());
              if (requireId == FAILED_REQUIRE_ID) {
                throw new RssException(
                    String.format(
                        "requirePreAllocation failed! size[%s], host[%s], port[%s]",
                        allocateSize, host, port));
              }
              long start = System.currentTimeMillis();
              SendShuffleDataRequest rpcRequest =
                  SendShuffleDataRequest.newBuilder()
                      .setAppId(appId)
                      .setShuffleId(stb.getKey())
                      .setRequireBufferId(requireId)
                      .addAllShuffleData(shuffleData)
                      .setTimestamp(start)
                      .setStageAttemptNumber(stageAttemptNumber)
                      .build();
              SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
              if (LOG.isDebugEnabled()) {
                LOG.debug(
                    "Do sendShuffleData to {}:{} rpc cost:"
                        + (System.currentTimeMillis() - start)
                        + " ms for "
                        + allocateSize
                        + " bytes with "
                        + finalBlockNum
                        + " blocks",
                    host,
                    port);
              }
              if (response.getStatus() != RssProtos.StatusCode.SUCCESS) {
                String msg =
                    "Can't send shuffle data with "
                        + finalBlockNum
                        + " blocks to "
                        + host
                        + ":"
                        + port
                        + ", statusCode="
                        + response.getStatus()
                        + ", errorMsg:"
                        + response.getRetMsg();
                failedStatusCode.set(StatusCode.fromCode(response.getStatus().getNumber()));
                if (NOT_RETRY_STATUS_CODES.contains(failedStatusCode.get())) {
                  throw new NotRetryException(msg);
                } else {
                  throw new RssException(msg);
                }
              }
              return response;
            },
            null,
            request.getRetryIntervalMax(),
            maxRetryAttempts,
            t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
      } catch (Throwable throwable) {
        LOG.warn("Failed to send shuffle data due to ", throwable);
        isSuccessful = false;
        break;
      }
    }

    RssSendShuffleDataResponse response;
    if (isSuccessful) {
      response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
    } else {
      response = new RssSendShuffleDataResponse(failedStatusCode.get());
    }
    response.setNeedSplitPartitionIds(needSplitPartitionIds);
    return response;
  }