o2a/mappers/subworkflow_mapper.py (78 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Maps subworkflow of Oozie to Airflow's sub-dag""" import logging import os from typing import Dict, List, Set, Type from xml.etree.ElementTree import Element from o2a.converter.oozie_converter import OozieConverter from o2a.converter.relation import Relation from o2a.converter.renderers import BaseRenderer from o2a.converter.task import Task from o2a.definitions import EXAMPLES_PATH from o2a.mappers.action_mapper import ActionMapper from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.transformers.base_transformer import BaseWorkflowTransformer from o2a.utils import xml_utils TAG_APP = "app-path" # pylint: disable=too-many-instance-attributes class SubworkflowMapper(ActionMapper): """ Converts a Sub-workflow Oozie node to an Airflow task. """ # pylint: disable=too-many-arguments def __init__( self, oozie_node: Element, name: str, dag_name: str, input_directory_path: str, output_directory_path: str, props: PropertySet, action_mapper: Dict[str, Type[ActionMapper]], renderer: BaseRenderer, transformers: List[BaseWorkflowTransformer] = None, **kwargs, ): ActionMapper.__init__( self, oozie_node=oozie_node, name=name, dag_name=dag_name, props=props, input_directory_path=input_directory_path, **kwargs, ) self.task_id = name self.input_directory_path = input_directory_path self.output_directory_path = output_directory_path self.dag_name = dag_name self.action_mapper = action_mapper self.renderer = renderer self.transformers = transformers or [] self._parse_oozie_node() def _parse_oozie_node(self): app_path = xml_utils.get_tag_el_text(self.oozie_node, TAG_APP) _, _, self.app_name = app_path.rpartition("/") # TODO: hacky: we should calculate it deriving from input_directory_path and comparing app-path # TODO: but for now we assume app is in "examples" app_path = os.path.join(EXAMPLES_PATH, self.app_name) logging.info(f"Converting subworkflow from {app_path}") converter = OozieConverter( input_directory_path=app_path, output_directory_path=self.output_directory_path, renderer=self.renderer, action_mapper=self.action_mapper, dag_name=self.app_name, initial_props=self.get_child_props(), transformers=self.transformers, ) converter.convert(as_subworkflow=True) def get_child_props(self) -> PropertySet: propagate_configuration = self.oozie_node.find("propagate-configuration") # Below the `is not None` is necessary due to Element's __bool__() return value: # `len(self._children) != 0`, # and `propagate_configuration` is an empty node so __bool__() will always return False. return ( self.props if propagate_configuration is not None else PropertySet(config={}, job_properties={}) ) def to_tasks_and_relations(self): tasks: List[Task] = [ Task(task_id=self.name, template_name="subwf.tpl", template_params=dict(app_name=self.app_name)) ] relations: List[Relation] = [] return tasks, relations def required_imports(self) -> Set[str]: return { "from airflow.utils import dates", "from airflow.providers.google.cloud.operators import dataproc", "from airflow.operators.subdag import SubDagOperator", f"import subdag_{self.app_name}", }