o2a/mappers/distcp_mapper.py (52 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ DistCp Mapper module """ import shlex from typing import Dict, List, Set from xml.etree.ElementTree import Element from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension from o2a.converter.relation import Relation from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils import xml_utils, el_utils from o2a.utils.file_archive_extractors import ArchiveExtractor, FileExtractor class DistCpMapper(ActionMapper): """ Converts a Pig Oozie node to an Airflow task. """ def __init__(self, oozie_node: Element, name: str, dag_name: str, props: PropertySet, **kwargs): ActionMapper.__init__( self, oozie_node=oozie_node, dag_name=dag_name, name=name, props=props, **kwargs ) self.props = props self.params_dict: Dict[str, str] = {} self.file_extractor = FileExtractor(oozie_node=oozie_node, props=props) self.archive_extractor = ArchiveExtractor(oozie_node=oozie_node, props=props) self.oozie_node = oozie_node self.args: str = "" self.prepare_extension: PrepareMapperExtension = PrepareMapperExtension(self) def _parse_args(self): args = [] arg_nodes = xml_utils.find_nodes_by_tag(self.oozie_node, "arg") if arg_nodes: for node in arg_nodes: value: str = node.text if "/" in value: # If an argument contains a forward slash then it's a URL. # The full URL should be preserved when replacing the EL (and not just the path) # to enable copying files between two different clusters. value = el_utils.replace_url_el(value, props=self.props) value = shlex.quote(value) args.append(value) return " ".join(args) def _get_distcp_command(self): return f"--class=org.apache.hadoop.tools.DistCp -- {self.args}" def on_parse_node(self): super().on_parse_node() self.args = self._parse_args() def to_tasks_and_relations(self): action_task = Task( task_id=self.name, template_name="distcp.tpl", template_params=dict(props=self.props, distcp_command=self._get_distcp_command()), ) tasks = [action_task] relations: List[Relation] = [] prepare_task = self.prepare_extension.get_prepare_task() if prepare_task: tasks, relations = self.prepend_task(prepare_task, tasks, relations) return tasks, relations def required_imports(self) -> Set[str]: return {"import shlex", "from airflow.operators import bash"}