public void handleSendShuffleDataRequest()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [145:384]


  public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequestV1 req) {
    try (ServerRpcAuditContext auditContext = createAuditContext("sendShuffleData", client)) {
      RpcResponse rpcResponse;
      String appId = req.getAppId();
      int shuffleId = req.getShuffleId();
      long requireBufferId = req.getRequireId();
      int stageAttemptNumber = req.getStageAttemptNumber();
      ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
      ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
      // info is null, means pre-allocated buffer has been removed by preAllocatedBufferCheck
      // thread,
      // otherwise we need to release the required size.
      PreAllocatedBufferInfo info =
          shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
      int requireSize = info == null ? 0 : info.getRequireSize();
      int requireBlocksSize = Math.max(requireSize - req.getDecodedLength(), 0);

      boolean isPreAllocated = info != null;

      auditContext.withAppId(appId).withShuffleId(shuffleId);
      auditContext.withArgs(
          "requireBufferId="
              + requireBufferId
              + ", requireSize="
              + requireSize
              + ", isPreAllocated="
              + isPreAllocated
              + ", requireBlocksSize="
              + requireBlocksSize
              + ", stageAttemptNumber="
              + stageAttemptNumber
              + ", partitionCount="
              + req.getPartitionToBlocks().size());

      ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
      if (taskInfo == null) {
        rpcResponse =
            new RpcResponse(
                req.getRequestId(), StatusCode.APP_NOT_FOUND, "appId: " + appId + " not found");
        String errorMsg =
            "APP_NOT_FOUND error, requireBufferId["
                + requireBufferId
                + "] for appId["
                + appId
                + "], shuffleId["
                + shuffleId
                + "], isPreAllocated["
                + isPreAllocated
                + "]";
        LOG.error(errorMsg);
        ShuffleServerMetrics.counterAppNotFound.inc();
        releaseNettyBufferAndMetrics(
            req,
            appId,
            shuffleId,
            requireBufferId,
            requireBlocksSize,
            shuffleBufferManager,
            info,
            isPreAllocated);
        auditContext.withStatusCode(rpcResponse.getStatusCode());
        client.getChannel().writeAndFlush(rpcResponse);
        return;
      }
      Integer latestStageAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
      // The Stage retry occurred, and the task before StageNumber was simply ignored and not
      // processed if the task was being sent.
      if (stageAttemptNumber < latestStageAttemptNumber) {
        String responseMessage = "A retry has occurred at the Stage, sending data is invalid.";
        rpcResponse =
            new RpcResponse(req.getRequestId(), StatusCode.STAGE_RETRY_IGNORE, responseMessage);
        LOG.warn(
            "Stage retry occurred, appId["
                + appId
                + "], shuffleId["
                + shuffleId
                + "], stageAttemptNumber["
                + stageAttemptNumber
                + "], latestStageAttemptNumber["
                + latestStageAttemptNumber
                + "]");
        releaseNettyBufferAndMetrics(
            req,
            appId,
            shuffleId,
            requireBufferId,
            requireBlocksSize,
            shuffleBufferManager,
            info,
            isPreAllocated);
        auditContext.withStatusCode(rpcResponse.getStatusCode());
        client.getChannel().writeAndFlush(rpcResponse);
        return;
      }
      long timestamp = req.getTimestamp();
      if (timestamp > 0) {
        /*
         * Here we record the transport time, but we don't consider the impact of data size on transport time.
         * The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
         * and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
         * In addition, we need to pay attention to that the time of the client machine and the machine
         * time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
         * */
        long transportTime = System.currentTimeMillis() - timestamp;
        if (transportTime > 0) {
          shuffleServer
              .getNettyMetrics()
              .recordTransportTime(SendShuffleDataRequest.class.getName(), transportTime);
        }
      }

      StatusCode ret = StatusCode.SUCCESS;
      String responseMessage = "OK";
      if (req.getPartitionToBlocks().size() > 0) {
        ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
        if (!isPreAllocated) {
          req.getPartitionToBlocks().values().stream()
              .flatMap(Collection::stream)
              .forEach(block -> block.getData().release());

          String errorMsg =
              "Can't find requireBufferId["
                  + requireBufferId
                  + "] for appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "], probably because the pre-allocated buffer has expired. "
                  + "Please increase the expiration time using "
                  + ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED.key()
                  + " in ShuffleServer's configuration";
          LOG.warn(errorMsg);
          rpcResponse = new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, errorMsg);
          auditContext.withStatusCode(rpcResponse.getStatusCode());
          client.getChannel().writeAndFlush(rpcResponse);
          return;
        }
        final long start = System.currentTimeMillis();
        shuffleBufferManager.releaseMemory(req.getDecodedLength(), false, true);
        List<ShufflePartitionedData> shufflePartitionedDataList = toPartitionedDataList(req);
        long alreadyReleasedSize = 0;
        boolean hasFailureOccurred = false;
        for (ShufflePartitionedData spd : shufflePartitionedDataList) {
          String shuffleDataInfo =
              "appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "], partitionId["
                  + spd.getPartitionId()
                  + "]";
          try {
            if (hasFailureOccurred) {
              continue;
            }
            ret = shuffleTaskManager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
            if (ret != StatusCode.SUCCESS) {
              String errorMsg =
                  "Error happened when shuffleEngine.write for "
                      + shuffleDataInfo
                      + ", statusCode="
                      + ret;
              LOG.error(errorMsg);
              responseMessage = errorMsg;
              hasFailureOccurred = true;
            } else {
              if (shuffleServer.isRemoteMergeEnable()) {
                shuffleServer.getShuffleMergeManager().setDirect(appId, shuffleId, true);
              }
              long toReleasedSize = spd.getTotalBlockEncodedLength();
              // after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
              shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
              alreadyReleasedSize += toReleasedSize;
              shuffleTaskManager.updateCachedBlockIds(
                  appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
            }
          } catch (ExceedHugePartitionHardLimitException e) {
            String errorMsg =
                "ExceedHugePartitionHardLimitException Error happened when shuffleEngine.write for "
                    + shuffleDataInfo
                    + ": "
                    + e.getMessage();
            ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
            ret = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
            responseMessage = errorMsg;
            LOG.error(errorMsg);
            hasFailureOccurred = true;
          } catch (Exception e) {
            String errorMsg =
                "Error happened when shuffleEngine.write for "
                    + shuffleDataInfo
                    + ": "
                    + e.getMessage();
            ret = StatusCode.INTERNAL_ERROR;
            responseMessage = errorMsg;
            LOG.error(errorMsg);
            hasFailureOccurred = true;
          } finally {
            // Once the cache failure occurs, we should explicitly release data held by byteBuf
            if (hasFailureOccurred) {
              Arrays.stream(spd.getBlockList()).forEach(block -> block.getData().release());
              shuffleBufferManager.releaseMemory(spd.getTotalBlockEncodedLength(), false, false);
            }
          }
        }
        // since the required buffer id is only used once, the shuffle client would try to require
        // another buffer whether
        // current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
        // removed, then after
        // cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
        if (requireBlocksSize > alreadyReleasedSize) {
          shuffleTaskManager.releasePreAllocatedSize(requireBlocksSize - alreadyReleasedSize);
        }
        rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
        long costTime = System.currentTimeMillis() - start;
        shuffleServer
            .getNettyMetrics()
            .recordProcessTime(SendShuffleDataRequest.class.getName(), costTime);
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "Cache Shuffle Data for appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "], cost "
                  + costTime
                  + " ms with "
                  + shufflePartitionedDataList.size()
                  + " blocks and "
                  + requireBlocksSize
                  + " bytes");
        }
      } else {
        rpcResponse =
            new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No data in request");
      }
      auditContext.withStatusCode(rpcResponse.getStatusCode());
      client.getChannel().writeAndFlush(rpcResponse);
    }
  }