private Job parseJobFile()

in client/migrationx/migrationx-domain/migrationx-domain-azkaban/src/main/java/com/aliyun/dataworks/migrationx/domain/dataworks/azkaban/service/AzkabanPackageParser.java [203:311]


    private Job parseJobFile(FlowLoader flowLoader, File flowFile, Flow flow, Node node) {
        Props props = getNodeProps(flowLoader, flowFile, flow, node.getId());
        Map<String, Props> nodeProps = jobProperties.computeIfAbsent(flow.getId(), flowName -> new HashMap<>());
        nodeProps.put(node.getId(), props);

        Properties properties = props.toAllProperties();

        String type = properties.getProperty("type");
        JobType jobType = JobType.getByName(type);
        Job job = JobType.newJobInstance(jobType);
        job.setName(node.getId());
        if (StringUtils.isNotBlank(props.getSource())) {
            job.setJobFile(new File(projectDir, props.getSource()));
        }
        List<Field> fields = ReflectUtils.getPropertyFields(job);
        fields.removeIf(
            field -> field.getDeclaringClass().equals(Job.class) && "type".equalsIgnoreCase(field.getName()));

        Enumeration<?> e = properties.propertyNames();
        Set<String> keys = new HashSet<>();
        while (e.hasMoreElements()) {
            String key = (String)e.nextElement();
            keys.add(key);
        }

        fields.forEach(field -> {
            ConfigProperty anno = field.getDeclaredAnnotation(ConfigProperty.class);
            if (anno == null) {
                return;
            }
            try {
                field.setAccessible(true);

                if (StringUtils.isNotBlank(anno.pattern())) {
                    Pattern pattern = Pattern.compile(anno.pattern());
                    List<String> values = keys.stream()
                        .filter(key -> pattern.matcher(key).matches())
                        .map(properties::getProperty)
                        .collect(Collectors.toList());
                    field.set(job, values);
                }

                if (StringUtils.isNotBlank(anno.value())) {
                    String value = properties.getProperty(anno.value(), null);
                    Object objectValue = convertType(field, anno, value);
                    field.set(job, objectValue);
                }
            } catch (IllegalAccessException illegalAccessException) {
                throw new RuntimeException(illegalAccessException);
            }
        });
        job.processJobRelativeFiles();

        if (CollectionUtils.isEmpty(job.getDependencies())) {
            job.setDependencies(CollectionUtils.emptyIfNull(flow.getEdges()).stream()
                .filter(edge -> StringUtils.equals(edge.getTargetId(), node.getId()))
                .map(Edge::getSourceId).collect(Collectors.toList()));
            LOGGER.info("flow: {}, job: {}, depends: {}", flow.getId(), job.getName(), job.getDependencies());
        }

        // 如果自己是一个flow节点
        if (JobType.flow.equals(job.getType())) {
            List<String> depends = ListUtils.emptyIfNull(job.getDependencies());
            // flow虚节点设置为依赖flow内部所有节点的end节点
            List<String> flowDepends = Optional.ofNullable(Optional.ofNullable(
                        MapUtils.emptyIfNull(flowLoader.getFlowMap())
                            .get(Joiner.on(Constants.PATH_DELIMITER).join(flow.getId(), job.getName())))
                    .orElse(flowLoader.getFlowMap().values().stream()
                        .filter(f -> StringUtils.equals(f.getId(), node.getEmbeddedFlowId()))
                        .findFirst().orElse(null)))
                .map(f -> ListUtils.emptyIfNull(f.getEndNodes()).stream()
                    .map(n -> Joiner.on(FLOW_NODE_DELIMITER).join(FileNameUtils.normalizedFileName(f.getId()), n.getId()))
                    .collect(Collectors.toList()))
                .orElse(new ArrayList<>());

            // flow内部的所有start节点依赖flow虚节点的上游
            // Optional.ofNullable(MapUtils.emptyIfNull(flowLoader.getFlowMap()).get(node.getEmbeddedFlowId()))
            // .ifPresent(embeddedFlow -> {
            //    ListUtils.emptyIfNull(depends).forEach(dep -> {
            //        embeddedFlow.getStartNodes().stream().forEach(n -> {
            //            Edge edge = new Edge(Joiner.on(FLOW_NODE_DELIMITER).join(flow.getId(), dep), n.getId());
            //            embeddedFlow.addEdge(edge);
            //            //LOGGER.info("add edge: {} for flow: {}, inner node: {}",
            //            //    Joiner.on(">>").join(edge.getSourceId(), edge.getTargetId()),
            //            //    embeddedFlow.getId(), n.getId());
            //        });
            //    });
            //});

            Optional.ofNullable(MapUtils.emptyIfNull(flowLoader.getFlowMap()).get(node.getEmbeddedFlowId()))
                .ifPresent(f -> f.setEmbeddedFlow(true));
            job.setDependencies(new ArrayList<>(CollectionUtils.union(depends, flowDepends)));
            LOGGER.info("flow: {}, job: {}, depends: {}", flow.getId(), job.getName(), job.getDependencies());
        } else {
            // flow内部的start节点, 依赖flow节点的上游
            // if (CollectionUtils.isEmpty(job.getDependencies()) && BooleanUtils.isTrue(flow.isEmbeddedFlow())) {
            //    // 找到flow节点
            //    flowLoader.getFlowMap().values().stream()
            //        .filter(f -> f.getNodes().stream().anyMatch(n -> flow.getId().equals(n.getEmbeddedFlowId())))
            //        .forEach(f ->
            //            f.getNodes().stream().filter(n -> flow.getId().equals(n.getEmbeddedFlowId()))
            //                .findFirst()
            //                .ifPresent(flowNode -> job.setDependencies(
            //                    Collections.singletonList(Joiner.on(FLOW_NODE_DELIMITER).join(f.getId(), flowNode
            //                    .getId())))));
            //}
        }
        return job;
    }