in manager/manager/src/main/java/org/apache/doris/stack/service/construct/DataImportService.java [370:425]
public DataImportTaskPageResp getTaskList(int tableId, int curPage, int pageSize,
CoreUserEntity user) throws Exception {
log.debug("User {} get table {} file import task by page.", user.getId(), tableId);
ClusterInfoEntity clusterInfo = clusterUserComponent.getUserCurrentClusterAndCheckAdmin(user);
List<DataImportTaskPageResp.DataImportTaskResp> taskRespList = Lists.newArrayList();
ManagerTableEntity tableEntity = tableRepository.findById(tableId).get();
ManagerDatabaseEntity databaseEntity =
databuildComponent.checkClusterDatabase(tableEntity.getDbId(), clusterInfo.getId());
String dbName = databaseEntity.getName();
log.debug("Paging read data import task list.");
Pageable pageable = PageRequest.of(curPage - 1, pageSize);
Specification<DataImportTaskEntity> spec = new Specification() {
@Override
public Predicate toPredicate(Root root, CriteriaQuery query, CriteriaBuilder cb) {
Predicate p = cb.equal(root.get("tableId"), tableId);
return p;
}
};
List<DataImportTaskEntity> taskEntities = taskRepository.findAll(spec, pageable).toList();
log.debug("list size {}", taskEntities.size());
String showLabel = "show load where Label=";
for (DataImportTaskEntity taskEntity : taskEntities) {
if (taskEntity.getImportType().equals(HDFS_IMPORT_TYPE)
&& (taskEntity.getStatus().equals(DataImportTaskPageResp.Status.PENDING.name())
|| taskEntity.getStatus().equals(DataImportTaskPageResp.Status.LOADING.name()))) {
String sql = showLabel + "\"" + taskEntity.getTaskName() + "\"";
NativeQueryResp queryResult = queryClient.executeSQL(sql, ConstantDef.DORIS_DEFAULT_NS, dbName, clusterInfo);
if (queryResult.getData().size() != 1) {
log.error("The task label is not exist in palo, delete it");
taskRepository.delete(taskEntity);
}
HdfsImportTaskInfo taskInfo = new HdfsImportTaskInfo(queryResult);
if (!StringUtils.isEmpty(taskInfo.getState())
&& !taskEntity.getStatus().equals(taskInfo.getState())) {
log.debug("The task status has changed, update info");
taskEntity.updateByHdfsTaskInfo(taskInfo);
taskRepository.save(taskEntity);
}
}
DataImportTaskPageResp.DataImportTaskResp taskModel = taskEntity.tansToResponseModel(tableEntity.getName());
taskRespList.add(taskModel);
}
DataImportTaskPageResp result = new DataImportTaskPageResp();
result.setTaskRespList(taskRespList);
result.setPage(curPage);
result.setPageSize(pageSize);
result.setTotalSize(taskRepository.countByTableId(tableId));
return result;
}