def parse_action_node()

in o2a/converter/workflow_xml_parser.py [0:0]


    def parse_action_node(self, action_node: ET.Element):
        """
        Action nodes are the mechanism by which a workflow triggers the
        execution of a computation/processing task.

        Action nodes are required to have an action-choice (map-reduce, etc.),
        ok, and error node in the xml.
        """
        # The 0th element of the node is the actual action tag.
        # In the form of 'action'
        action_operation_node = action_node[0]
        action_name = action_operation_node.tag

        mapper: BaseMapper
        if action_name not in self.action_map:
            action_name = "unknown"
            mapper = DummyMapper(
                oozie_node=action_operation_node,
                name=action_node.attrib["name"],
                dag_name=self.workflow.dag_name,
                props=self.props,
            )
        else:
            map_class = self.action_map[action_name]
            mapper = map_class(
                oozie_node=action_operation_node,
                name=action_node.attrib["name"],
                props=self.props,
                dag_name=self.workflow.dag_name,
                action_mapper=self.action_map,
                renderer=self.renderer,
                input_directory_path=self.workflow.input_directory_path,
                output_directory_path=self.workflow.output_directory_path,
                jar_files=self.workflow.jar_files,
                transformers=self.transformers,
            )

        oozie_action_node = OozieActionNode(mapper)
        ok_node = action_node.find("ok")
        if ok_node is None:
            raise Exception(f"Missing ok node in {action_node}")
        oozie_action_node.downstream_names.append(ok_node.attrib["to"])
        error_node = action_node.find("error")
        if error_node is None:
            raise Exception(f"Missing error node in {action_node}")
        oozie_action_node.error_downstream_name = error_node.attrib["to"]

        mapper.on_parse_node()

        logging.info(f"Parsed {mapper.name} as Action Node of type {action_name}.")

        self.workflow.nodes[mapper.name] = oozie_action_node