public Map importSqlWorkflowDefinition()

in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java [1249:1401]


    public Map<String, Object> importSqlWorkflowDefinition(User loginUser, long projectCode, MultipartFile file) {
        Map<String, Object> result;
        Project project = projectMapper.queryByCode(projectCode);
        result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_IMPORT);
        if (result.get(Constants.STATUS) != Status.SUCCESS) {
            return result;
        }
        String workflowDefinitionName =
                file.getOriginalFilename() == null ? file.getName() : file.getOriginalFilename();
        int index = workflowDefinitionName.lastIndexOf(".");
        if (index > 0) {
            workflowDefinitionName = workflowDefinitionName.substring(0, index);
        }
        workflowDefinitionName = getNewName(workflowDefinitionName, IMPORT_SUFFIX);

        WorkflowDefinition workflowDefinition;
        List<TaskDefinitionLog> taskDefinitionList = new ArrayList<>();
        List<WorkflowTaskRelationLog> workflowTaskRelationLogList = new ArrayList<>();

        // for Zip Bomb Attack
        final int THRESHOLD_ENTRIES = 10000;
        final int THRESHOLD_SIZE = 1000000000; // 1 GB
        final double THRESHOLD_RATIO = 10;
        int totalEntryArchive = 0;
        int totalSizeEntry = 0;
        // In most cases, there will be only one data source
        Map<String, DataSource> dataSourceCache = new HashMap<>(1);
        Map<String, Long> taskNameToCode = new HashMap<>(16);
        Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
        try (
                ZipInputStream zIn = new ZipInputStream(file.getInputStream());
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(zIn))) {
            // build workflow definition
            workflowDefinition = new WorkflowDefinition(projectCode,
                    workflowDefinitionName,
                    CodeGenerateUtils.genCode(),
                    "",
                    "[]", null,
                    0, loginUser.getId());

            ZipEntry entry;
            while ((entry = zIn.getNextEntry()) != null) {
                totalEntryArchive++;
                int totalSizeArchive = 0;
                if (!entry.isDirectory()) {
                    StringBuilder sql = new StringBuilder();
                    String taskName = null;
                    String datasourceName = null;
                    List<String> upstreams = Collections.emptyList();
                    String line;
                    while ((line = bufferedReader.readLine()) != null) {
                        int nBytes = line.getBytes(StandardCharsets.UTF_8).length;
                        totalSizeEntry += nBytes;
                        totalSizeArchive += nBytes;
                        long compressionRatio = totalSizeEntry / entry.getCompressedSize();
                        if (compressionRatio > THRESHOLD_RATIO) {
                            throw new IllegalStateException(
                                    "Ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack.");
                        }
                        int commentIndex = line.indexOf("-- ");
                        if (commentIndex >= 0) {
                            int colonIndex = line.indexOf(":", commentIndex);
                            if (colonIndex > 0) {
                                String key = line.substring(commentIndex + 3, colonIndex).trim().toLowerCase();
                                String value = line.substring(colonIndex + 1).trim();
                                switch (key) {
                                    case "name":
                                        taskName = value;
                                        line = line.substring(0, commentIndex);
                                        break;
                                    case "upstream":
                                        upstreams = Arrays.stream(value.split(",")).map(String::trim)
                                                .filter(s -> !"".equals(s)).collect(Collectors.toList());
                                        line = line.substring(0, commentIndex);
                                        break;
                                    case "datasource":
                                        datasourceName = value;
                                        line = line.substring(0, commentIndex);
                                        break;
                                    default:
                                        break;
                                }
                            }
                        }
                        if (!"".equals(line)) {
                            sql.append(line).append("\n");
                        }
                    }
                    // import/sql1.sql -> sql1
                    if (taskName == null) {
                        taskName = entry.getName();
                        index = taskName.indexOf("/");
                        if (index > 0) {
                            taskName = taskName.substring(index + 1);
                        }
                        index = taskName.lastIndexOf(".");
                        if (index > 0) {
                            taskName = taskName.substring(0, index);
                        }
                    }
                    DataSource dataSource = dataSourceCache.get(datasourceName);
                    if (dataSource == null) {
                        dataSource = queryDatasourceByNameAndUser(datasourceName, loginUser);
                    }
                    if (dataSource == null) {
                        log.error("Datasource does not found, may be its name is illegal.");
                        putMsg(result, Status.DATASOURCE_NAME_ILLEGAL);
                        return result;
                    }
                    dataSourceCache.put(datasourceName, dataSource);

                    TaskDefinitionLog taskDefinition =
                            buildNormalSqlTaskDefinition(taskName, dataSource, sql.substring(0, sql.length() - 1));

                    taskDefinitionList.add(taskDefinition);
                    taskNameToCode.put(taskDefinition.getName(), taskDefinition.getCode());
                    taskNameToUpstream.put(taskDefinition.getName(), upstreams);
                }

                if (totalSizeArchive > THRESHOLD_SIZE) {
                    throw new IllegalStateException(
                            "the uncompressed data size is too much for the application resource capacity");
                }

                if (totalEntryArchive > THRESHOLD_ENTRIES) {
                    throw new IllegalStateException(
                            "too much entries in this archive, can lead to inodes exhaustion of the system");
                }
            }
        } catch (Exception e) {
            log.error("Import workflow definition error.", e);
            putMsg(result, Status.IMPORT_WORKFLOW_DEFINE_ERROR);
            return result;
        }

        // build task relation
        for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) {
            List<String> upstreams = taskNameToUpstream.get(entry.getKey());
            if (CollectionUtils.isEmpty(upstreams)
                    || (upstreams.size() == 1 && upstreams.contains("root") && !taskNameToCode.containsKey("root"))) {
                WorkflowTaskRelationLog workflowTaskRelationLog = buildNormalTaskRelation(0, entry.getValue());
                workflowTaskRelationLogList.add(workflowTaskRelationLog);
                continue;
            }
            for (String upstream : upstreams) {
                WorkflowTaskRelationLog workflowTaskRelationLog =
                        buildNormalTaskRelation(taskNameToCode.get(upstream), entry.getValue());
                workflowTaskRelationLogList.add(workflowTaskRelationLog);
            }
        }

        return createDagDefine(loginUser, workflowTaskRelationLogList, workflowDefinition, taskDefinitionList);
    }