o2a/converter/workflow_xml_parser.py (171 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parsing module """
import logging
import os
import uuid
# noinspection PyPep8Naming
import xml.etree.ElementTree as ET
# noinspection PyPackageRequirements
from typing import Dict, List, Type
from airflow.utils.trigger_rule import TriggerRule
from o2a.converter.constants import HDFS_FOLDER
from o2a.converter.oozie_node import OozieActionNode, OozieControlNode
from o2a.converter.renderers import BaseRenderer
from o2a.converter.workflow import Workflow
from o2a.mappers.action_mapper import ActionMapper
from o2a.mappers.base_mapper import BaseMapper
from o2a.mappers.decision_mapper import DecisionMapper
from o2a.mappers.dummy_mapper import DummyMapper
from o2a.mappers.end_mapper import EndMapper
from o2a.mappers.fork_mapper import ForkMapper
from o2a.mappers.join_mapper import JoinMapper
from o2a.mappers.kill_mapper import KillMapper
from o2a.mappers.start_mapper import StartMapper
from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet
from o2a.transformers.base_transformer import BaseWorkflowTransformer
from o2a.utils import xml_utils
# noinspection PyDefaultArgument
class WorkflowXmlParser:
"""Parses XML of an Oozie workflow"""
def __init__(
self,
props: PropertySet,
action_mapper: Dict[str, Type[ActionMapper]],
renderer: BaseRenderer,
workflow: Workflow,
transformers: List[BaseWorkflowTransformer] = None,
):
self.workflow = workflow
self.workflow_file = os.path.join(workflow.input_directory_path, HDFS_FOLDER, "workflow.xml")
self.props = props
self.action_map = action_mapper
self.renderer = renderer
self.transformers = transformers
def parse_kill_node(self, kill_node: ET.Element):
"""
When a workflow node reaches the `kill` node, it finishes in an error.
A workflow definition may have zero or more kill nodes.
"""
mapper = KillMapper(
oozie_node=kill_node,
name=kill_node.attrib["name"],
dag_name=self.workflow.dag_name,
trigger_rule=TriggerRule.ONE_FAILED,
props=self.props,
)
oozie_control_node = OozieControlNode(mapper)
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as Kill Node.")
self.workflow.nodes[kill_node.attrib["name"]] = oozie_control_node
def parse_end_node(self, end_node):
"""
Upon reaching the end node, the workflow is considered completed successfully.
Thus it gets mapped to a dummy node that always completes.
"""
mapper = EndMapper(oozie_node=end_node, name=end_node.attrib["name"], dag_name=self.workflow.dag_name)
oozie_control_node = OozieControlNode(mapper)
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as End Node.")
self.workflow.nodes[end_node.attrib["name"]] = oozie_control_node
def parse_fork_node(self, root, fork_node):
"""
Fork nodes need to be dummy operators with multiple parallel downstream
tasks.
This parses the fork node, the action nodes that it references and then
the join node at the end.
This will only parse well-formed xml-adhering workflows where all paths
end at the join node.
"""
fork_name = fork_node.attrib["name"]
mapper = ForkMapper(oozie_node=fork_node, name=fork_name, dag_name=self.workflow.dag_name)
oozie_control_node = OozieControlNode(mapper)
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as Fork Node.")
paths = []
for node in fork_node:
if "path" in node.tag:
# Parse all the downstream tasks that can run in parallel.
curr_name = node.attrib["start"]
paths.append(xml_utils.find_node_by_name(root, curr_name))
self.workflow.nodes[fork_name] = oozie_control_node
for path in paths:
oozie_control_node.downstream_names.append(path.attrib["name"])
logging.info(f"Added {mapper.name}'s downstream: {path.attrib['name']}")
# Theoretically these will all be action nodes, however I don't
# think that is guaranteed.
# The end of the execution path has not been reached
self.parse_node(root, path)
if path.attrib["name"] not in self.workflow.nodes:
root.remove(path)
def parse_join_node(self, join_node):
"""
Join nodes wait for the corresponding beginning fork node paths to
finish. As the parser we are assuming the Oozie workflow follows the
schema perfectly.
"""
mapper = JoinMapper(
oozie_node=join_node, name=join_node.attrib["name"], dag_name=self.workflow.dag_name
)
oozie_control_node = OozieControlNode(mapper)
oozie_control_node.downstream_names.append(join_node.attrib["to"])
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as Join Node.")
self.workflow.nodes[join_node.attrib["name"]] = oozie_control_node
def parse_decision_node(self, decision_node):
"""
A decision node enables a workflow to make a selection on the execution
path to follow.
The behavior of a decision node can be seen as a switch-case statement.
A decision node consists of a list of predicates-transition pairs plus
a default transition. Predicates are evaluated in order or appearance
until one of them evaluates to true and the corresponding transition is
taken. If none of the predicates evaluates to true the default
transition is taken.
example oozie wf decision node:
<decision name="[NODE-NAME]">
<switch>
<case to="[NODE_NAME]">[PREDICATE]</case>
...
<case to="[NODE_NAME]">[PREDICATE]</case>
<default to="[NODE_NAME]"/>
</switch>
</decision>
"""
mapper = DecisionMapper(
oozie_node=decision_node,
name=decision_node.attrib["name"],
dag_name=self.workflow.dag_name,
props=self.props,
)
oozie_control_node = OozieControlNode(mapper)
for cases in decision_node[0]:
oozie_control_node.downstream_names.append(cases.attrib["to"])
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as Decision Node.")
self.workflow.nodes[decision_node.attrib["name"]] = oozie_control_node
def parse_action_node(self, action_node: ET.Element):
"""
Action nodes are the mechanism by which a workflow triggers the
execution of a computation/processing task.
Action nodes are required to have an action-choice (map-reduce, etc.),
ok, and error node in the xml.
"""
# The 0th element of the node is the actual action tag.
# In the form of 'action'
action_operation_node = action_node[0]
action_name = action_operation_node.tag
mapper: BaseMapper
if action_name not in self.action_map:
action_name = "unknown"
mapper = DummyMapper(
oozie_node=action_operation_node,
name=action_node.attrib["name"],
dag_name=self.workflow.dag_name,
props=self.props,
)
else:
map_class = self.action_map[action_name]
mapper = map_class(
oozie_node=action_operation_node,
name=action_node.attrib["name"],
props=self.props,
dag_name=self.workflow.dag_name,
action_mapper=self.action_map,
renderer=self.renderer,
input_directory_path=self.workflow.input_directory_path,
output_directory_path=self.workflow.output_directory_path,
jar_files=self.workflow.jar_files,
transformers=self.transformers,
)
oozie_action_node = OozieActionNode(mapper)
ok_node = action_node.find("ok")
if ok_node is None:
raise Exception(f"Missing ok node in {action_node}")
oozie_action_node.downstream_names.append(ok_node.attrib["to"])
error_node = action_node.find("error")
if error_node is None:
raise Exception(f"Missing error node in {action_node}")
oozie_action_node.error_downstream_name = error_node.attrib["to"]
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as Action Node of type {action_name}.")
self.workflow.nodes[mapper.name] = oozie_action_node
def parse_start_node(self, start_node):
"""
The start node is the entry point for a workflow job, it indicates the
first workflow node the workflow job must transition to.
When a workflow is started, it automatically transitions to the
node specified in the start.
A workflow definition must have one start node.
"""
# Theoretically this could cause conflicts, but it is very unlikely
start_name = "start_node_" + str(uuid.uuid4())[:4]
mapper = StartMapper(
oozie_node=start_node,
name=start_name,
dag_name=self.workflow.dag_name,
props=self.props,
trigger_rule=TriggerRule.ALWAYS,
)
oozie_control_node = OozieControlNode(mapper)
oozie_control_node.downstream_names.append(start_node.attrib["to"])
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as Start Node.")
self.workflow.nodes[start_name] = oozie_control_node
def parse_node(self, root, node):
"""
Given a node, determines its tag, and then passes it to the correct
parser.
:param root: The root node of the XML tree.
:param node: The node to parse.
"""
if "action" in node.tag:
self.parse_action_node(node)
elif "start" in node.tag:
self.parse_start_node(node)
elif "kill" in node.tag:
self.parse_kill_node(node)
elif "end" in node.tag:
self.parse_end_node(node)
elif "fork" in node.tag:
self.parse_fork_node(root, node)
elif "join" in node.tag:
self.parse_join_node(node)
elif "decision" in node.tag:
self.parse_decision_node(node)
def parse_workflow(self):
"""Parses workflow replacing invalid characters in the names of the nodes"""
tree = ET.parse(self.workflow_file)
root = tree.getroot()
for node in tree.iter():
# Strip namespaces
node.tag = node.tag.split("}")[1][0:]
logging.info("Stripped namespaces, and replaced invalid characters.")
for node in root:
logging.debug(f"Parsing node: {node}")
self.parse_node(root, node)