public void handleSendShuffleDataRequest()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [94:214]


  public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
    RpcResponse rpcResponse;
    String appId = req.getAppId();
    int shuffleId = req.getShuffleId();
    long requireBufferId = req.getRequireId();
    long timestamp = req.getTimestamp();
    if (timestamp > 0) {
      /*
       * Here we record the transport time, but we don't consider the impact of data size on transport time.
       * The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
       * and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
       * In addition, we need to pay attention to that the time of the client machine and the machine
       * time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
       * */
      long transportTime = System.currentTimeMillis() - timestamp;
      if (transportTime > 0) {
        shuffleServer
            .getNettyMetrics()
            .recordTransportTime(SendShuffleDataRequest.class.getName(), transportTime);
      }
    }
    int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);

    StatusCode ret = StatusCode.SUCCESS;
    String responseMessage = "OK";
    if (req.getPartitionToBlocks().size() > 0) {
      ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
      ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
      PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
      boolean isPreAllocated = info != null;
      if (!isPreAllocated) {
        String errorMsg =
            "Can't find requireBufferId["
                + requireBufferId
                + "] for appId["
                + appId
                + "], shuffleId["
                + shuffleId
                + "]";
        LOG.warn(errorMsg);
        responseMessage = errorMsg;
        rpcResponse =
            new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, responseMessage);
        client.getChannel().writeAndFlush(rpcResponse);
        return;
      }
      final long start = System.currentTimeMillis();
      List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
      long alreadyReleasedSize = 0;
      for (ShufflePartitionedData spd : shufflePartitionedData) {
        String shuffleDataInfo =
            "appId["
                + appId
                + "], shuffleId["
                + shuffleId
                + "], partitionId["
                + spd.getPartitionId()
                + "]";
        try {
          ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
          if (ret != StatusCode.SUCCESS) {
            String errorMsg =
                "Error happened when shuffleEngine.write for "
                    + shuffleDataInfo
                    + ", statusCode="
                    + ret;
            LOG.error(errorMsg);
            responseMessage = errorMsg;
            break;
          } else {
            long toReleasedSize = spd.getTotalBlockSize();
            // after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
            manager.releasePreAllocatedSize(toReleasedSize);
            alreadyReleasedSize += toReleasedSize;
            manager.updateCachedBlockIds(
                appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
          }
        } catch (Exception e) {
          String errorMsg =
              "Error happened when shuffleEngine.write for "
                  + shuffleDataInfo
                  + ": "
                  + e.getMessage();
          ret = StatusCode.INTERNAL_ERROR;
          responseMessage = errorMsg;
          LOG.error(errorMsg);
          break;
        }
      }
      // since the required buffer id is only used once, the shuffle client would try to require
      // another buffer whether
      // current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
      // removed, then after
      // cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
      if (info.getRequireSize() > alreadyReleasedSize) {
        manager.releasePreAllocatedSize(info.getRequireSize() - alreadyReleasedSize);
      }
      rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
      long costTime = System.currentTimeMillis() - start;
      shuffleServer
          .getNettyMetrics()
          .recordProcessTime(SendShuffleDataRequest.class.getName(), costTime);
      LOG.debug(
          "Cache Shuffle Data for appId["
              + appId
              + "], shuffleId["
              + shuffleId
              + "], cost "
              + costTime
              + " ms with "
              + shufflePartitionedData.size()
              + " blocks and "
              + requireSize
              + " bytes");
    } else {
      rpcResponse =
          new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No data in request");
    }

    client.getChannel().writeAndFlush(rpcResponse);
  }