public RssSendShuffleDataResponse sendShuffleData()

in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java [136:251]


  public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
        request.getShuffleIdToBlocks();
    int stageAttemptNumber = request.getStageAttemptNumber();
    boolean isSuccessful = true;
    AtomicReference<StatusCode> failedStatusCode = new AtomicReference<>(StatusCode.INTERNAL_ERROR);
    Set<Integer> needSplitPartitionIds = new HashSet<>();
    for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
        shuffleIdToBlocks.entrySet()) {
      int shuffleId = stb.getKey();
      int size = 0;
      int blockNum = 0;
      List<Integer> partitionIds = new ArrayList<>();
      List<Integer> partitionRequireSizes = new ArrayList<>();
      for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
        int partitionRequireSize = 0;
        for (ShuffleBlockInfo sbi : ptb.getValue()) {
          partitionRequireSize += sbi.getSize();
          blockNum++;
        }
        size += partitionRequireSize;
        partitionIds.add(ptb.getKey());
        partitionRequireSizes.add(partitionRequireSize);
      }

      SendShuffleDataRequest sendShuffleDataRequest =
          new SendShuffleDataRequest(
              requestId(),
              request.getAppId(),
              shuffleId,
              stageAttemptNumber,
              0L,
              stb.getValue(),
              System.currentTimeMillis());
      int allocateSize = size + sendShuffleDataRequest.encodedLength();
      int finalBlockNum = blockNum;
      try {
        RetryUtils.retryWithCondition(
            () -> {
              final TransportClient transportClient = getTransportClient();
              Pair<Long, List<Integer>> result =
                  requirePreAllocation(
                      request.getAppId(),
                      shuffleId,
                      partitionIds,
                      partitionRequireSizes,
                      allocateSize,
                      request.getRetryMax(),
                      request.getRetryIntervalMax(),
                      failedStatusCode);
              long requireId = result.getLeft();
              needSplitPartitionIds.addAll(result.getRight());
              if (requireId == FAILED_REQUIRE_ID) {
                throw new RssException(
                    String.format(
                        "requirePreAllocation failed! size[%s], host[%s], port[%s]",
                        allocateSize, host, port));
              }
              sendShuffleDataRequest.setRequireId(requireId);
              sendShuffleDataRequest.setTimestamp(System.currentTimeMillis());
              long start = System.currentTimeMillis();
              RpcResponse rpcResponse =
                  transportClient.sendRpcSync(sendShuffleDataRequest, rpcTimeout);
              if (LOG.isDebugEnabled()) {
                LOG.debug(
                    "Do sendShuffleData to {}:{} rpc cost:"
                        + (System.currentTimeMillis() - start)
                        + " ms for "
                        + allocateSize
                        + " bytes with "
                        + finalBlockNum
                        + " blocks",
                    host,
                    port);
              }
              if (rpcResponse.getStatusCode() != StatusCode.SUCCESS) {
                failedStatusCode.set(StatusCode.fromCode(rpcResponse.getStatusCode().statusCode()));
                String msg =
                    "Can't send shuffle data with "
                        + finalBlockNum
                        + " blocks to "
                        + host
                        + ":"
                        + port
                        + ", statusCode="
                        + rpcResponse.getStatusCode()
                        + ", errorMsg:"
                        + rpcResponse.getRetMessage();
                if (NOT_RETRY_STATUS_CODES.contains(rpcResponse.getStatusCode())) {
                  throw new NotRetryException(msg);
                } else {
                  throw new RssException(msg);
                }
              }
              return rpcResponse;
            },
            null,
            request.getRetryIntervalMax(),
            maxRetryAttempts,
            t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
      } catch (Throwable throwable) {
        LOG.warn("Failed to send shuffle data due to ", throwable);
        isSuccessful = false;
        break;
      }
    }

    RssSendShuffleDataResponse response;
    if (isSuccessful) {
      response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
    } else {
      response = new RssSendShuffleDataResponse(failedStatusCode.get());
    }
    response.setNeedSplitPartitionIds(needSplitPartitionIds);
    return response;
  }