public RssSendShuffleDataResponse sendShuffleData()

in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java [68:161]


  public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) {
    TransportClient transportClient = getTransportClient();
    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
        request.getShuffleIdToBlocks();
    boolean isSuccessful = true;

    for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
        shuffleIdToBlocks.entrySet()) {
      int shuffleId = stb.getKey();
      int size = 0;
      int blockNum = 0;
      for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
        for (ShuffleBlockInfo sbi : ptb.getValue()) {
          size += sbi.getSize();
          blockNum++;
        }
      }

      int allocateSize = size;
      int finalBlockNum = blockNum;
      try {
        RetryUtils.retry(
            () -> {
              long requireId =
                  requirePreAllocation(
                      request.getAppId(),
                      allocateSize,
                      request.getRetryMax(),
                      request.getRetryIntervalMax());
              if (requireId == FAILED_REQUIRE_ID) {
                throw new RssException(
                    String.format(
                        "requirePreAllocation failed! size[%s], host[%s], port[%s]",
                        allocateSize, host, port));
              }

              SendShuffleDataRequest sendShuffleDataRequest =
                  new SendShuffleDataRequest(
                      requestId(),
                      request.getAppId(),
                      shuffleId,
                      requireId,
                      stb.getValue(),
                      System.currentTimeMillis());
              long start = System.currentTimeMillis();
              RpcResponse rpcResponse =
                  transportClient.sendRpcSync(sendShuffleDataRequest, RPC_TIMEOUT_DEFAULT_MS);
              LOG.debug(
                  "Do sendShuffleData to {}:{} rpc cost:"
                      + (System.currentTimeMillis() - start)
                      + " ms for "
                      + allocateSize
                      + " bytes with "
                      + finalBlockNum
                      + " blocks",
                  host,
                  port);
              if (rpcResponse.getStatusCode() != StatusCode.SUCCESS) {
                String msg =
                    "Can't send shuffle data with "
                        + finalBlockNum
                        + " blocks to "
                        + host
                        + ":"
                        + port
                        + ", statusCode="
                        + rpcResponse.getStatusCode()
                        + ", errorMsg:"
                        + rpcResponse.getRetMessage();
                if (rpcResponse.getStatusCode() == StatusCode.NO_REGISTER) {
                  throw new NotRetryException(msg);
                } else {
                  throw new RssException(msg);
                }
              }
              return rpcResponse;
            },
            request.getRetryIntervalMax(),
            maxRetryAttempts);
      } catch (Throwable throwable) {
        LOG.warn(throwable.getMessage());
        isSuccessful = false;
        break;
      }
    }

    RssSendShuffleDataResponse response;
    if (isSuccessful) {
      response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
    } else {
      response = new RssSendShuffleDataResponse(StatusCode.INTERNAL_ERROR);
    }
    return response;
  }