private List parseFlowNodes()

in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/emr/AliyunEmrWorkflowConverter.java [328:424]


    private List<Node> parseFlowNodes(AliyunEmrProject project,
            ListFlowResponse.FlowItem flowItem,
            Map<String, FlowNodeDef> app,
            Workflow workflow) {
        List<Node> nodeList = new ArrayList<>();
        Map<String, List<FlowNodeDef>> groupByJobId = app.values().stream()
                .filter(nd -> Objects.nonNull(nd.getJobId()))
                .collect(Collectors.groupingBy(FlowNodeDef::getJobId));
        for (FlowNodeDef nodeDef : app.values()) {
            DwNode node = new DwNode();
            try {
                node.setStartRightNow(supplyStartRightNow());
                node.setCronExpress(
                        CronExpressUtil.normalize(StringUtils.defaultIfBlank(flowItem.getCronExpr(), "day")));
                DescribeFlowResponse flowDetail = project.getFlows().get(flowItem);
                if (FLOW_NODE_DEF_TYPE_ACTION.equals(nodeDef.getType())) {
                    ListFlowJobResponse.Job job = project.getJobById(nodeDef.getJobId());
                    node.setName(getNodeName(groupByJobId, job, nodeDef));
                    node.setType(getNodeType(workflow, node, job));
                    node.setCode(convertCode(job));
                    node.setRawNodeType(RawNodeType.valueOf(job.getType()).name());
                    node.setTaskRerunTime(job.getMaxRetry());
                    node.setTaskRerunInterval((int) (job.getRetryInterval() * 1000));
                    node.setParameter(getNodeParameter(node, job));
                    NodeIo output = new NodeIo();
                    output.setData(getNodeDefaultOutput(project, flowItem, node.getName()));
                    output.setParseType(1);
                    node.getOutputs().add(output);
                    node.setDescription(job.getDescription());
                    node.setCode(handleJobCode(flowItem, node, job));
                    node.setResourceGroup(
                            properties.getProperty(Constants.CONVERTER_TARGET_SCHEDULE_RES_GROUP_IDENTIFIER, null));
                } else {
                    node.setName(getStartEndNodeName(flowItem, nodeDef));
                    node.setType(CodeProgramType.VIRTUAL.name());
                    node.setRawNodeType(getStartEndRawNodeName(nodeDef));
                    NodeIo output = new NodeIo();
                    output.setData(getNodeDefaultOutput(project, flowItem, getStartEndRawNodeName(nodeDef)));
                    output.setParseType(1);
                    node.getOutputs().add(output);
                }
                node.setPauseSchedule(StringUtils.equalsIgnoreCase("STOP_SCHEDULE", flowItem.getStatus()));
                if (flowDetail.getStartSchedule() != null) {
                    node.setStartEffectDate(DateUtils.convertLongToDate(flowDetail.getStartSchedule()));
                }
                if (flowDetail.getEndSchedule() != null) {
                    node.setEndEffectDate(DateUtils.convertLongToDate(flowDetail.getEndSchedule()));
                }
                node.setNodeUseType(NodeUseType.SCHEDULED);
                node.setOwner(this.project.getOpUser());
                node.setIsAutoParse(0);
                nodeList.add(node);
            } catch (Exception e) {
                ReportItem reportItem = new ReportItem();
                reportItem.setMessage(e.getMessage());
                reportItem.setException(ExceptionUtils.getStackTrace(e));
                reportItem.setRiskLevel(ReportRiskLevel.ERROR);
                reportItem.setNode(node);
                reportItem.setWorkflow(workflow);
                reportItem.setType(ReportItemType.EMR_JOB_TO_DATAWORKS_NODE.name());
                reportItem.setName(workflow.getName() + "/" + node.getName());
                reportItems.add(reportItem);
                LOGGER.error("convert node error: ", e);
            }
        }

        // for relations
        for (FlowNodeDef nodeDef : app.values()) {
            ListFlowJobResponse.Job job = project.getJobById(nodeDef.getJobId());
            String name = job != null ? getNodeName(groupByJobId, job, nodeDef) : getStartEndNodeName(flowItem,
                    nodeDef);
            Node node = nodeList.stream().filter(n -> n.getName().equals(name)).findFirst().orElse(null);
            if (node == null) {
                continue;
            }

            if (nodeDef.getTransitions() == null) {
                continue;
            }

            for (String nodeKey : nodeDef.getTransitions()) {
                FlowNodeDef downstreamNodeDef = app.get(nodeKey);
                ListFlowJobResponse.Job downstreamJob = project.getJobById(downstreamNodeDef.getJobId());
                String downstreamNodeName = downstreamJob != null ?
                        getNodeName(groupByJobId, downstreamJob, downstreamNodeDef) : getStartEndNodeName(flowItem,
                        downstreamNodeDef);
                Node downstreamNode = nodeList.stream()
                        .filter(n -> n.getName().equals(downstreamNodeName)).findFirst().orElse(null);
                if (downstreamNode == null) {
                    continue;
                }

                downstreamNode.getInputs().add(node.getOutputs().get(0));
            }
        }
        return nodeList;
    }