o2a/converter/oozie_converter.py (130 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Converts Oozie application workflow into Airflow's DAG """ import shutil from typing import Dict, Type, List import os import logging from o2a.converter import workflow_xml_parser from o2a.converter.constants import HDFS_FOLDER from o2a.converter.oozie_node import OozieNode, OozieControlNode, OozieActionNode from o2a.converter.property_parser import PropertyParser from o2a.converter.relation import Relation from o2a.converter.renderers import BaseRenderer from o2a.converter.task_group import TaskGroup, ControlTaskGroup, ActionTaskGroup from o2a.converter.workflow import Workflow from o2a.utils.file_utils import get_lib_files from o2a.mappers.action_mapper import ActionMapper from o2a.transformers.base_transformer import BaseWorkflowTransformer from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet # pylint: disable=too-many-instance-attributes class OozieConverter: """ Converts Oozie Workflow app to Airflow's DAG Each WorkflowXmlParser class corresponds to one workflow, where one can get the workflow's required dependencies (imports), operator relations, and operator execution sequence. :param dag_name: Desired output DAG name. :param input_directory_path: Oozie workflow directory. :param output_directory_path: Desired output directory. :param action_mapper: List of charters that support action nodes :param renderer: Renderer that will be used for the output file :param transformers: List of transformers that will transform a workflow :param user: Username. # TODO remove me and use real ${user} EL :param initial_props: Initial PropertySet object """ def __init__( self, dag_name: str, input_directory_path: str, output_directory_path: str, action_mapper: Dict[str, Type[ActionMapper]], renderer: BaseRenderer, transformers: List[BaseWorkflowTransformer] = None, user: str = None, initial_props: PropertySet = None, ): self.workflow = Workflow( dag_name=dag_name, input_directory_path=input_directory_path, output_directory_path=output_directory_path, ) self.renderer = renderer self.transformers = transformers or [] # Propagate the configuration in case initial property set is passed job_properties = {} if not initial_props else initial_props.job_properties job_properties["user.name"] = user or os.environ["USER"] self.props = PropertySet(job_properties=job_properties) self.property_parser = PropertyParser(props=self.props, workflow=self.workflow) self.parser = workflow_xml_parser.WorkflowXmlParser( props=self.props, action_mapper=action_mapper, renderer=self.renderer, workflow=self.workflow, transformers=self.transformers, ) def retrieve_lib_jar_libraries(self): logging.info(f"Looking for jar libraries for the workflow in {self.workflow.library_folder}.") self.workflow.jar_files = get_lib_files(self.workflow.library_folder, extension=".jar") def recreate_output_directory(self): shutil.rmtree(self.workflow.output_directory_path, ignore_errors=True) os.makedirs(self.workflow.output_directory_path, exist_ok=True) def convert(self, as_subworkflow=False): """ Starts the process of converting the workflow. """ self.retrieve_lib_jar_libraries() self.property_parser.parse_property() self.parser.parse_workflow() self.apply_preconvert_transformers() self.convert_nodes() self.apply_postconvert_transformers() self.add_state_handlers() self.convert_relations() self.convert_dependencies() if as_subworkflow: self.renderer.create_subworkflow_file(workflow=self.workflow, props=self.props) else: self.renderer.create_workflow_file(workflow=self.workflow, props=self.props) self.copy_extra_assets(self.workflow.nodes) def convert_nodes(self): """ For each Oozie node, converts it into relations and internal relations. It uses the mapper, which is stored in ParsedActionNode. The result is saved in ParsedActionNode.tasks and ParsedActionNode.relations """ logging.info("Converting nodes to tasks and inner relations") for name, oozie_node in self.workflow.nodes.copy().items(): tasks, relations = oozie_node.mapper.to_tasks_and_relations() dependencies = oozie_node.mapper.required_imports() oozie_node.tasks = tasks oozie_node.relations = relations self.workflow.task_groups[name] = self._get_task_group_type(oozie_node)( name=name, tasks=tasks, relations=relations, dependencies=dependencies, downstream_names=oozie_node.downstream_names, error_downstream_name=oozie_node.error_downstream_name, ) del self.workflow.nodes[name] @staticmethod def _get_task_group_type(oozie_node: OozieNode) -> Type[TaskGroup]: if isinstance(oozie_node, OozieControlNode): return ControlTaskGroup if isinstance(oozie_node, OozieActionNode): return ActionTaskGroup return TaskGroup def convert_dependencies(self) -> None: logging.info("Converting dependencies.") for task_group in self.workflow.task_groups.values(): self.workflow.dependencies.update(task_group.dependencies) def convert_relations(self) -> None: logging.info("Converting relations between tasks groups.") for task_group in self.workflow.task_groups.values(): for downstream in task_group.downstream_names: relation = Relation( from_task_id=task_group.last_task_id_of_ok_flow, to_task_id=self.workflow.task_groups[downstream].first_task_id, ) self.workflow.task_group_relations.add(relation) error_downstream = task_group.error_downstream_name if error_downstream: relation = Relation( from_task_id=task_group.last_task_id_of_error_flow, to_task_id=self.workflow.task_groups[error_downstream].first_task_id, is_error=True, ) self.workflow.task_group_relations.add(relation) def add_state_handlers(self) -> None: logging.info("Adding error handlers") for node in self.workflow.task_groups.values(): node.add_state_handler_if_needed() def copy_extra_assets(self, nodes: Dict[str, OozieNode]): """ Copies additional assets needed to execute a workflow, eg. Pig scripts. """ for node in nodes.values(): logging.info(f"Copies additional assets for the node: {node.name}") node.mapper.copy_extra_assets( input_directory_path=os.path.join(self.workflow.input_directory_path, HDFS_FOLDER), output_directory_path=self.workflow.output_directory_path, ) def apply_preconvert_transformers(self): logging.info("Applying pre-convert transformers") for transformer in self.transformers: transformer.process_workflow_after_parse_workflow_xml(self.workflow) def apply_postconvert_transformers(self): logging.info("Applying post-convert transformers") for transformer in self.transformers: transformer.process_workflow_after_convert_nodes(self.workflow, props=self.props)