in o2a/converter/workflow_xml_parser.py [0:0]
def parse_action_node(self, action_node: ET.Element):
"""
Action nodes are the mechanism by which a workflow triggers the
execution of a computation/processing task.
Action nodes are required to have an action-choice (map-reduce, etc.),
ok, and error node in the xml.
"""
# The 0th element of the node is the actual action tag.
# In the form of 'action'
action_operation_node = action_node[0]
action_name = action_operation_node.tag
mapper: BaseMapper
if action_name not in self.action_map:
action_name = "unknown"
mapper = DummyMapper(
oozie_node=action_operation_node,
name=action_node.attrib["name"],
dag_name=self.workflow.dag_name,
props=self.props,
)
else:
map_class = self.action_map[action_name]
mapper = map_class(
oozie_node=action_operation_node,
name=action_node.attrib["name"],
props=self.props,
dag_name=self.workflow.dag_name,
action_mapper=self.action_map,
renderer=self.renderer,
input_directory_path=self.workflow.input_directory_path,
output_directory_path=self.workflow.output_directory_path,
jar_files=self.workflow.jar_files,
transformers=self.transformers,
)
oozie_action_node = OozieActionNode(mapper)
ok_node = action_node.find("ok")
if ok_node is None:
raise Exception(f"Missing ok node in {action_node}")
oozie_action_node.downstream_names.append(ok_node.attrib["to"])
error_node = action_node.find("error")
if error_node is None:
raise Exception(f"Missing error node in {action_node}")
oozie_action_node.error_downstream_name = error_node.attrib["to"]
mapper.on_parse_node()
logging.info(f"Parsed {mapper.name} as Action Node of type {action_name}.")
self.workflow.nodes[mapper.name] = oozie_action_node