public RssSendShuffleDataResponse sendShuffleData()

in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java [355:469]


  public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
    String appId = request.getAppId();
    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
        request.getShuffleIdToBlocks();

    boolean isSuccessful = true;

    // prepare rpc request based on shuffleId -> partitionId -> blocks
    for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
        shuffleIdToBlocks.entrySet()) {
      List<ShuffleData> shuffleData = Lists.newArrayList();
      int size = 0;
      int blockNum = 0;
      int shuffleId = stb.getKey();
      List<Integer> partitionIds = new ArrayList<>();

      for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
        List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
        for (ShuffleBlockInfo sbi : ptb.getValue()) {
          shuffleBlocks.add(
              ShuffleBlock.newBuilder()
                  .setBlockId(sbi.getBlockId())
                  .setCrc(sbi.getCrc())
                  .setLength(sbi.getLength())
                  .setTaskAttemptId(sbi.getTaskAttemptId())
                  .setUncompressLength(sbi.getUncompressLength())
                  .setData(UnsafeByteOperations.unsafeWrap(sbi.getData().nioBuffer()))
                  .build());
          size += sbi.getSize();
          blockNum++;
        }
        shuffleData.add(
            ShuffleData.newBuilder()
                .setPartitionId(ptb.getKey())
                .addAllBlock(shuffleBlocks)
                .build());
        partitionIds.add(ptb.getKey());
      }

      final int allocateSize = size;
      final int finalBlockNum = blockNum;
      try {
        RetryUtils.retry(
            () -> {
              long requireId =
                  requirePreAllocation(
                      appId,
                      shuffleId,
                      partitionIds,
                      allocateSize,
                      request.getRetryMax() / maxRetryAttempts,
                      request.getRetryIntervalMax());
              if (requireId == FAILED_REQUIRE_ID) {
                throw new RssException(
                    String.format(
                        "requirePreAllocation failed! size[%s], host[%s], port[%s]",
                        allocateSize, host, port));
              }
              long start = System.currentTimeMillis();
              SendShuffleDataRequest rpcRequest =
                  SendShuffleDataRequest.newBuilder()
                      .setAppId(appId)
                      .setShuffleId(stb.getKey())
                      .setRequireBufferId(requireId)
                      .addAllShuffleData(shuffleData)
                      .setTimestamp(start)
                      .build();
              SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
              LOG.debug(
                  "Do sendShuffleData to {}:{} rpc cost:"
                      + (System.currentTimeMillis() - start)
                      + " ms for "
                      + allocateSize
                      + " bytes with "
                      + finalBlockNum
                      + " blocks",
                  host,
                  port);
              if (response.getStatus() != RssProtos.StatusCode.SUCCESS) {
                String msg =
                    "Can't send shuffle data with "
                        + finalBlockNum
                        + " blocks to "
                        + host
                        + ":"
                        + port
                        + ", statusCode="
                        + response.getStatus()
                        + ", errorMsg:"
                        + response.getRetMsg();
                if (response.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
                  throw new NotRetryException(msg);
                } else {
                  throw new RssException(msg);
                }
              }
              return response;
            },
            request.getRetryIntervalMax(),
            maxRetryAttempts);
      } catch (Throwable throwable) {
        LOG.warn(throwable.getMessage());
        isSuccessful = false;
        break;
      }
    }

    RssSendShuffleDataResponse response;
    if (isSuccessful) {
      response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
    } else {
      response = new RssSendShuffleDataResponse(StatusCode.INTERNAL_ERROR);
    }
    return response;
  }