o2a/mappers/shell_mapper.py (45 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Maps Shell action into Airflow's DAG"""
from typing import List, Set
from xml.etree.ElementTree import Element
from o2a.converter.task import Task
from o2a.converter.relation import Relation
from o2a.mappers.action_mapper import ActionMapper
from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension
from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet
from o2a.utils.xml_utils import get_tag_el_text, get_tags_el_array_from_text
from o2a.o2a_libs.src.o2a_lib import el_parser
TAG_RESOURCE = "resource-manager"
TAG_NAME = "name-node"
TAG_CMD = "exec"
TAG_ARG = "argument"
class ShellMapper(ActionMapper):
"""
Converts a Shell Oozie action to an Airflow task.
"""
def __init__(self, oozie_node: Element, name: str, props: PropertySet, **kwargs):
ActionMapper.__init__(self, oozie_node=oozie_node, name=name, props=props, **kwargs)
self._parse_oozie_node()
self.prepare_extension: PrepareMapperExtension = PrepareMapperExtension(self)
def _parse_oozie_node(self):
self.resource_manager = get_tag_el_text(self.oozie_node, TAG_RESOURCE)
self.name_node = get_tag_el_text(self.oozie_node, TAG_NAME)
cmd_txt = get_tag_el_text(self.oozie_node, TAG_CMD)
args = get_tags_el_array_from_text(self.oozie_node, TAG_ARG)
cmd = " ".join([cmd_txt] + [x for x in args])
self.bash_command = el_parser.translate(cmd, quote=False)
self.pig_command = f"sh {self.bash_command}"
def to_tasks_and_relations(self):
action_task = Task(
task_id=self.name,
template_name="shell.tpl",
template_params=dict(
pig_command=self.pig_command, action_node_properties=self.props.action_node_properties
),
)
tasks = [action_task]
relations: List[Relation] = []
prepare_task = self.prepare_extension.get_prepare_task()
if prepare_task:
tasks, relations = self.prepend_task(prepare_task, tasks, relations)
return tasks, relations
def required_imports(self) -> Set[str]:
return {
"from airflow.utils import dates",
"from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator"
}