public Message openFile()

in linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java [617:760]


  public Message openFile(
      HttpServletRequest req,
      @RequestParam(value = "path", required = false) String path,
      @RequestParam(value = "page", defaultValue = "1") Integer page,
      @RequestParam(value = "pageSize", defaultValue = "5000") Integer pageSize,
      @RequestParam(value = "nullValue", defaultValue = "") String nullValue,
      @RequestParam(value = "enableLimit", defaultValue = "") String enableLimit,
      @RequestParam(value = "columnPage", required = false, defaultValue = "1") Integer columnPage,
      @RequestParam(value = "columnPageSize", required = false, defaultValue = "500")
          Integer columnPageSize)
      throws IOException, WorkSpaceException {

    Message message = Message.ok();
    if (StringUtils.isEmpty(path)) {
      throw WorkspaceExceptionManager.createException(80004, path);
    }

    if (columnPage < 1 || columnPageSize < 1 || columnPageSize > 500) {
      throw WorkspaceExceptionManager.createException(80036, path);
    }

    String userName = ModuleUserUtils.getOperationUser(req, "openFile " + path);
    LoggerUtils.setJobIdMDC("openFileThread_" + userName);
    LOGGER.info("userName {} start to open File {}", userName, path);
    Long startTime = System.currentTimeMillis();
    if (!checkIsUsersDirectory(path, userName)) {
      throw WorkspaceExceptionManager.createException(80010, userName, path);
    }
    FsPath fsPath = new FsPath(path);
    FileSystem fileSystem = fsService.getFileSystemForRead(userName, fsPath);
    // Throws an exception if the file does not have read access(如果文件没读权限,抛出异常)
    if (!fileSystem.canRead(fsPath)) {
      throw WorkspaceExceptionManager.createException(80012);
    }

    int[] columnIndices = null;
    FileSource fileSource = null;
    try {
      fileSource = FileSource$.MODULE$.create(fsPath, fileSystem);
      if (nullValue != null && BLANK.equalsIgnoreCase(nullValue)) {
        nullValue = "";
      }
      if (FileSource$.MODULE$.isResultSet(fsPath.getPath())) {
        if (!StringUtils.isEmpty(nullValue)) {
          fileSource.addParams("nullValue", nullValue);
        }
        if (pageSize > FILESYSTEM_RESULTSET_ROW_LIMIT.getValue()) {
          throw WorkspaceExceptionManager.createException(
              80034, FILESYSTEM_RESULTSET_ROW_LIMIT.getValue());
        }

        if (StringUtils.isNotBlank(enableLimit)) {
          LOGGER.info("set enable limit for thread: {}", Thread.currentThread().getName());
          LinkisStorageConf.enableLimitThreadLocal().set(enableLimit);
          // 组装列索引
          columnIndices = genColumnIndices(columnPage, columnPageSize);
          LinkisStorageConf.columnIndicesThreadLocal().set(columnIndices);
        }

        fileSource = fileSource.page(page, pageSize);
      } else if (fileSystem.getLength(fsPath)
          > ByteTimeUtils.byteStringAsBytes(FILESYSTEM_FILE_CHECK_SIZE.getValue())) {
        // Increase file size limit, making it easy to OOM without limitation
        throw WorkspaceExceptionManager.createException(80032);
      }

      try {
        Pair<Object, ArrayList<String[]>> result = fileSource.collect()[0];
        LOGGER.info(
            "Finished to open File {}, taken {} ms", path, System.currentTimeMillis() - startTime);
        IOUtils.closeQuietly(fileSource);
        Object metaMap = result.getFirst();
        Map[] newMap = null;
        try {
          if (metaMap instanceof Map[]) {
            Map[] realMap = (Map[]) metaMap;
            int realSize = realMap.length;

            // 判断列索引在实际列数范围内
            if ((columnPage - 1) * columnPageSize > realSize) {
              throw WorkspaceExceptionManager.createException(80036, path);
            }

            message.data("totalColumn", realSize);
            if (realSize > FILESYSTEM_RESULT_SET_COLUMN_LIMIT.getValue()) {
              message.data("column_limit_display", true);
              message.data(
                  "zh_msg",
                  "全量结果集超过"
                      + FILESYSTEM_RESULT_SET_COLUMN_LIMIT.getValue()
                      + "列,页面提供500列数据预览,如需查看完整结果集,请使用结果集导出功能");
              message.data(
                  "en_msg",
                  "Because your result set is large, to view the full result set, use the Result set Export feature.");
            }
            if (columnIndices == null || columnIndices.length >= realSize) {
              newMap = realMap;
            } else {
              int realLength =
                  (columnPage * columnPageSize) > realSize
                      ? realSize - (columnPage - 1) * columnPageSize
                      : columnPageSize;
              newMap = new Map[realLength];
              for (int i = 0; i < realLength; i++) {
                newMap[i] = realMap[columnIndices[i]];
              }
            }
          }
        } catch (Exception e) {
          LOGGER.info("Failed to set flag", e);
        }

        message
            .data("metadata", newMap == null ? metaMap : newMap)
            .data("fileContent", result.getSecond());
        message.data("type", fileSource.getFileSplits()[0].type());
        message.data("totalLine", fileSource.getTotalLine());
        return message.data("page", page).data("totalPage", 0);
      } catch (ColLengthExceedException e) {
        LOGGER.info("Failed to open file {}", path, e);
        message.data("type", fileSource.getFileSplits()[0].type());
        message.data("display_prohibited", true);
        message.data(
            "zh_msg",
            MessageFormat.format(
                "结果集存在字段值字符数超过{0},如需查看全部数据请导出文件或使用字符串截取函数(substring、substr)截取相关字符即可前端展示数据内容",
                LinkisStorageConf.LINKIS_RESULT_COL_LENGTH()));
        message.data(
            "en_msg",
            MessageFormat.format(
                "There is a field value exceed {0} characters or col size exceed {1} in the result set. If you want to view it, please use the result set export function.",
                LinkisStorageConf.LINKIS_RESULT_COL_LENGTH(),
                LinkisStorageConf.LINKIS_RESULT_COLUMN_SIZE()));
        return message;
      }
    } finally {
      // 移除标识
      if (StringUtils.isNotBlank(enableLimit)) {
        LinkisStorageConf.enableLimitThreadLocal().remove();
      }
      LoggerUtils.removeJobIdMDC();
      IOUtils.closeQuietly(fileSource);
    }
  }