in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/emr/AliyunEmrWorkflowConverter.java [328:424]
private List<Node> parseFlowNodes(AliyunEmrProject project,
ListFlowResponse.FlowItem flowItem,
Map<String, FlowNodeDef> app,
Workflow workflow) {
List<Node> nodeList = new ArrayList<>();
Map<String, List<FlowNodeDef>> groupByJobId = app.values().stream()
.filter(nd -> Objects.nonNull(nd.getJobId()))
.collect(Collectors.groupingBy(FlowNodeDef::getJobId));
for (FlowNodeDef nodeDef : app.values()) {
DwNode node = new DwNode();
try {
node.setStartRightNow(supplyStartRightNow());
node.setCronExpress(
CronExpressUtil.normalize(StringUtils.defaultIfBlank(flowItem.getCronExpr(), "day")));
DescribeFlowResponse flowDetail = project.getFlows().get(flowItem);
if (FLOW_NODE_DEF_TYPE_ACTION.equals(nodeDef.getType())) {
ListFlowJobResponse.Job job = project.getJobById(nodeDef.getJobId());
node.setName(getNodeName(groupByJobId, job, nodeDef));
node.setType(getNodeType(workflow, node, job));
node.setCode(convertCode(job));
node.setRawNodeType(RawNodeType.valueOf(job.getType()).name());
node.setTaskRerunTime(job.getMaxRetry());
node.setTaskRerunInterval((int) (job.getRetryInterval() * 1000));
node.setParameter(getNodeParameter(node, job));
NodeIo output = new NodeIo();
output.setData(getNodeDefaultOutput(project, flowItem, node.getName()));
output.setParseType(1);
node.getOutputs().add(output);
node.setDescription(job.getDescription());
node.setCode(handleJobCode(flowItem, node, job));
node.setResourceGroup(
properties.getProperty(Constants.CONVERTER_TARGET_SCHEDULE_RES_GROUP_IDENTIFIER, null));
} else {
node.setName(getStartEndNodeName(flowItem, nodeDef));
node.setType(CodeProgramType.VIRTUAL.name());
node.setRawNodeType(getStartEndRawNodeName(nodeDef));
NodeIo output = new NodeIo();
output.setData(getNodeDefaultOutput(project, flowItem, getStartEndRawNodeName(nodeDef)));
output.setParseType(1);
node.getOutputs().add(output);
}
node.setPauseSchedule(StringUtils.equalsIgnoreCase("STOP_SCHEDULE", flowItem.getStatus()));
if (flowDetail.getStartSchedule() != null) {
node.setStartEffectDate(DateUtils.convertLongToDate(flowDetail.getStartSchedule()));
}
if (flowDetail.getEndSchedule() != null) {
node.setEndEffectDate(DateUtils.convertLongToDate(flowDetail.getEndSchedule()));
}
node.setNodeUseType(NodeUseType.SCHEDULED);
node.setOwner(this.project.getOpUser());
node.setIsAutoParse(0);
nodeList.add(node);
} catch (Exception e) {
ReportItem reportItem = new ReportItem();
reportItem.setMessage(e.getMessage());
reportItem.setException(ExceptionUtils.getStackTrace(e));
reportItem.setRiskLevel(ReportRiskLevel.ERROR);
reportItem.setNode(node);
reportItem.setWorkflow(workflow);
reportItem.setType(ReportItemType.EMR_JOB_TO_DATAWORKS_NODE.name());
reportItem.setName(workflow.getName() + "/" + node.getName());
reportItems.add(reportItem);
LOGGER.error("convert node error: ", e);
}
}
// for relations
for (FlowNodeDef nodeDef : app.values()) {
ListFlowJobResponse.Job job = project.getJobById(nodeDef.getJobId());
String name = job != null ? getNodeName(groupByJobId, job, nodeDef) : getStartEndNodeName(flowItem,
nodeDef);
Node node = nodeList.stream().filter(n -> n.getName().equals(name)).findFirst().orElse(null);
if (node == null) {
continue;
}
if (nodeDef.getTransitions() == null) {
continue;
}
for (String nodeKey : nodeDef.getTransitions()) {
FlowNodeDef downstreamNodeDef = app.get(nodeKey);
ListFlowJobResponse.Job downstreamJob = project.getJobById(downstreamNodeDef.getJobId());
String downstreamNodeName = downstreamJob != null ?
getNodeName(groupByJobId, downstreamJob, downstreamNodeDef) : getStartEndNodeName(flowItem,
downstreamNodeDef);
Node downstreamNode = nodeList.stream()
.filter(n -> n.getName().equals(downstreamNodeName)).findFirst().orElse(null);
if (downstreamNode == null) {
continue;
}
downstreamNode.getInputs().add(node.getOutputs().get(0));
}
}
return nodeList;
}