in linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java [617:760]
public Message openFile(
HttpServletRequest req,
@RequestParam(value = "path", required = false) String path,
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "pageSize", defaultValue = "5000") Integer pageSize,
@RequestParam(value = "nullValue", defaultValue = "") String nullValue,
@RequestParam(value = "enableLimit", defaultValue = "") String enableLimit,
@RequestParam(value = "columnPage", required = false, defaultValue = "1") Integer columnPage,
@RequestParam(value = "columnPageSize", required = false, defaultValue = "500")
Integer columnPageSize)
throws IOException, WorkSpaceException {
Message message = Message.ok();
if (StringUtils.isEmpty(path)) {
throw WorkspaceExceptionManager.createException(80004, path);
}
if (columnPage < 1 || columnPageSize < 1 || columnPageSize > 500) {
throw WorkspaceExceptionManager.createException(80036, path);
}
String userName = ModuleUserUtils.getOperationUser(req, "openFile " + path);
LoggerUtils.setJobIdMDC("openFileThread_" + userName);
LOGGER.info("userName {} start to open File {}", userName, path);
Long startTime = System.currentTimeMillis();
if (!checkIsUsersDirectory(path, userName)) {
throw WorkspaceExceptionManager.createException(80010, userName, path);
}
FsPath fsPath = new FsPath(path);
FileSystem fileSystem = fsService.getFileSystemForRead(userName, fsPath);
// Throws an exception if the file does not have read access(如果文件没读权限,抛出异常)
if (!fileSystem.canRead(fsPath)) {
throw WorkspaceExceptionManager.createException(80012);
}
int[] columnIndices = null;
FileSource fileSource = null;
try {
fileSource = FileSource$.MODULE$.create(fsPath, fileSystem);
if (nullValue != null && BLANK.equalsIgnoreCase(nullValue)) {
nullValue = "";
}
if (FileSource$.MODULE$.isResultSet(fsPath.getPath())) {
if (!StringUtils.isEmpty(nullValue)) {
fileSource.addParams("nullValue", nullValue);
}
if (pageSize > FILESYSTEM_RESULTSET_ROW_LIMIT.getValue()) {
throw WorkspaceExceptionManager.createException(
80034, FILESYSTEM_RESULTSET_ROW_LIMIT.getValue());
}
if (StringUtils.isNotBlank(enableLimit)) {
LOGGER.info("set enable limit for thread: {}", Thread.currentThread().getName());
LinkisStorageConf.enableLimitThreadLocal().set(enableLimit);
// 组装列索引
columnIndices = genColumnIndices(columnPage, columnPageSize);
LinkisStorageConf.columnIndicesThreadLocal().set(columnIndices);
}
fileSource = fileSource.page(page, pageSize);
} else if (fileSystem.getLength(fsPath)
> ByteTimeUtils.byteStringAsBytes(FILESYSTEM_FILE_CHECK_SIZE.getValue())) {
// Increase file size limit, making it easy to OOM without limitation
throw WorkspaceExceptionManager.createException(80032);
}
try {
Pair<Object, ArrayList<String[]>> result = fileSource.collect()[0];
LOGGER.info(
"Finished to open File {}, taken {} ms", path, System.currentTimeMillis() - startTime);
IOUtils.closeQuietly(fileSource);
Object metaMap = result.getFirst();
Map[] newMap = null;
try {
if (metaMap instanceof Map[]) {
Map[] realMap = (Map[]) metaMap;
int realSize = realMap.length;
// 判断列索引在实际列数范围内
if ((columnPage - 1) * columnPageSize > realSize) {
throw WorkspaceExceptionManager.createException(80036, path);
}
message.data("totalColumn", realSize);
if (realSize > FILESYSTEM_RESULT_SET_COLUMN_LIMIT.getValue()) {
message.data("column_limit_display", true);
message.data(
"zh_msg",
"全量结果集超过"
+ FILESYSTEM_RESULT_SET_COLUMN_LIMIT.getValue()
+ "列,页面提供500列数据预览,如需查看完整结果集,请使用结果集导出功能");
message.data(
"en_msg",
"Because your result set is large, to view the full result set, use the Result set Export feature.");
}
if (columnIndices == null || columnIndices.length >= realSize) {
newMap = realMap;
} else {
int realLength =
(columnPage * columnPageSize) > realSize
? realSize - (columnPage - 1) * columnPageSize
: columnPageSize;
newMap = new Map[realLength];
for (int i = 0; i < realLength; i++) {
newMap[i] = realMap[columnIndices[i]];
}
}
}
} catch (Exception e) {
LOGGER.info("Failed to set flag", e);
}
message
.data("metadata", newMap == null ? metaMap : newMap)
.data("fileContent", result.getSecond());
message.data("type", fileSource.getFileSplits()[0].type());
message.data("totalLine", fileSource.getTotalLine());
return message.data("page", page).data("totalPage", 0);
} catch (ColLengthExceedException e) {
LOGGER.info("Failed to open file {}", path, e);
message.data("type", fileSource.getFileSplits()[0].type());
message.data("display_prohibited", true);
message.data(
"zh_msg",
MessageFormat.format(
"结果集存在字段值字符数超过{0},如需查看全部数据请导出文件或使用字符串截取函数(substring、substr)截取相关字符即可前端展示数据内容",
LinkisStorageConf.LINKIS_RESULT_COL_LENGTH()));
message.data(
"en_msg",
MessageFormat.format(
"There is a field value exceed {0} characters or col size exceed {1} in the result set. If you want to view it, please use the result set export function.",
LinkisStorageConf.LINKIS_RESULT_COL_LENGTH(),
LinkisStorageConf.LINKIS_RESULT_COLUMN_SIZE()));
return message;
}
} finally {
// 移除标识
if (StringUtils.isNotBlank(enableLimit)) {
LinkisStorageConf.enableLimitThreadLocal().remove();
}
LoggerUtils.removeJobIdMDC();
IOUtils.closeQuietly(fileSource);
}
}