o2a/mappers/extensions/prepare_mapper_extension.py (41 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Prepare node mixin"""
from typing import List, Tuple, Optional
from o2a.converter.task import Task
from o2a.mappers.base_mapper import BaseMapper
from o2a.utils import xml_utils
from o2a.utils.el_utils import normalize_path
class PrepareMapperExtension:
"""Extension of mapper used to add Prepare node capability to a node in composable way"""
def __init__(self, mapper: BaseMapper):
self.mapper: BaseMapper = mapper
def has_prepare(self):
prepare_node = xml_utils.find_node_by_tag(self.mapper.oozie_node, "prepare")
if prepare_node:
delete_nodes = xml_utils.find_nodes_by_tag(prepare_node, "delete")
mkdir_nodes = xml_utils.find_node_by_tag(prepare_node, "mkdir")
if delete_nodes or mkdir_nodes:
return True
return False
def get_prepare_task(self) -> Optional[Task]:
delete_paths, mkdir_paths = self.parse_prepare_node()
if not delete_paths and not mkdir_paths:
return None
delete = " ".join(delete_paths) if delete_paths else None
mkdir = " ".join(mkdir_paths) if mkdir_paths else None
return Task(
task_id=self.mapper.name + "_prepare",
template_name="prepare.tpl",
template_params=dict(delete=delete, mkdir=mkdir),
)
def parse_prepare_node(self) -> Tuple[List[str], List[str]]:
"""
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
"""
delete_paths = []
mkdir_paths = []
prepare_node = xml_utils.find_node_by_tag(self.mapper.oozie_node, "prepare")
if prepare_node:
# If there exists a prepare node, there will only be one, according
# to oozie xml schema
for node in prepare_node:
node_path = normalize_path(node.attrib["path"], props=self.mapper.props)
if node.tag == "delete":
delete_paths.append(node_path)
elif node.tag == "mkdir":
mkdir_paths.append(node_path)
else:
raise Exception(f"Unknown XML node in prepare: {node.tag}")
return delete_paths, mkdir_paths