o2a/mappers/fs_mapper.py (99 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Maps FS node to Airflow's DAG"""
import shlex
from typing import List, Set
from xml.etree.ElementTree import Element
from o2a.converter.task import Task
from o2a.mappers.action_mapper import ActionMapper
from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet
from o2a.utils.relation_utils import chain
from o2a.utils.el_utils import normalize_path
ACTION_TYPE = "fs"
FS_OP_MKDIR = "mkdir"
FS_OP_DELETE = "delete"
FS_OP_MOVE = "move"
FS_OP_CHMOD = "chmod"
FS_OP_TOUCHZ = "touchz"
FS_OP_CHGRP = "chgrp"
FS_TAG_PATH = "path"
FS_TAG_SOURCE = "source"
FS_TAG_TARGET = "target"
FS_TAG_RECURSIVE = "recursive"
FS_TAG_DIRFILES = "dir-files"
FS_TAG_PERMISSIONS = "permissions"
FS_TAG_GROUP = "group"
def prepare_mkdir_command(node: Element, props: PropertySet):
path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
command = "fs -mkdir -p {path}".format(path=shlex.quote(path))
return command
def prepare_delete_command(node: Element, props: PropertySet):
path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
command = "fs -rm -f -r {path}".format(path=shlex.quote(path))
return command
def prepare_move_command(node: Element, props: PropertySet):
source = normalize_path(node.attrib[FS_TAG_SOURCE], props=props)
target = normalize_path(node.attrib[FS_TAG_TARGET], props=props, allow_no_schema=True)
command = "fs -mv {source} {target}".format(source=shlex.quote(source), target=shlex.quote(target))
return command
def prepare_chmod_command(node: Element, props: PropertySet):
path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
permission = node.attrib[FS_TAG_PERMISSIONS]
# TODO: Add support for dirFiles Reference: GH issues #80
# dirFiles = bool_value(node, FS_TAG _DIRFILES)
recursive = node.find(FS_TAG_RECURSIVE) is not None
extra_param = "-R" if recursive else ""
command = "fs -chmod {extra} {permission} {path}".format(
extra=extra_param, path=shlex.quote(path), permission=shlex.quote(permission)
)
return command
def prepare_touchz_command(node: Element, props: PropertySet):
path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
command = "fs -touchz {path}".format(path=shlex.quote(path))
return command
def prepare_chgrp_command(node: Element, props: PropertySet):
path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
group = node.attrib[FS_TAG_GROUP]
recursive = node.find(FS_TAG_RECURSIVE) is not None
extra_param = "-R" if recursive else ""
command = "fs -chgrp {extra} {group} {path}".format(
extra=extra_param, group=shlex.quote(group), path=shlex.quote(path)
)
return command
FS_OPERATION_MAPPERS = {
FS_OP_MKDIR: prepare_mkdir_command,
FS_OP_DELETE: prepare_delete_command,
FS_OP_MOVE: prepare_move_command,
FS_OP_CHMOD: prepare_chmod_command,
FS_OP_TOUCHZ: prepare_touchz_command,
FS_OP_CHGRP: prepare_chgrp_command,
}
class FsMapper(ActionMapper):
"""
Converts a FS Oozie node to an Airflow task.
"""
def __init__(self, oozie_node: Element, name: str, dag_name: str, props: PropertySet, **kwargs):
super().__init__(oozie_node=oozie_node, name=name, props=props, dag_name=dag_name, **kwargs)
self.oozie_node = oozie_node
self.tasks: List[Task] = []
def on_parse_node(self):
super().on_parse_node()
self.tasks = self.parse_tasks()
def parse_tasks(self) -> List[Task]:
"""
Processes the nodes responsible for determining what operations are performed on the filesystem and
returns the tasks that suit them.
"""
tasks: List[Task] = []
operation_nodes = [node for node in self.oozie_node if node.tag in FS_OPERATION_MAPPERS.keys()]
operation_nodes_count = len(operation_nodes)
for index, node in enumerate(operation_nodes):
task = self.parse_fs_operation(index, node, operation_nodes_count)
tasks.append(task)
if not tasks:
# Each mapper must return at least one task
return [Task(task_id=self.name, template_name="dummy.tpl")]
return tasks
def to_tasks_and_relations(self):
return self.tasks, chain(self.tasks)
def required_imports(self) -> Set[str]:
return {"from airflow.operators import empty", "from airflow.operators import bash"}
def parse_fs_operation(self, index: int, node: Element, operation_nodes_count: int) -> Task:
tag_name = node.tag
task_id = self.name if operation_nodes_count == 1 else f"{self.name}_fs_{index}_{tag_name}"
mapper_fn = FS_OPERATION_MAPPERS[tag_name]
pig_command = mapper_fn(node, props=self.props)
return Task(
task_id=task_id,
template_name="fs_op.tpl",
template_params=dict(
pig_command=pig_command, action_node_properties=self.props.action_node_properties
),
)