o2a/utils/file_archive_extractors.py (73 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Mapper for File and Archive nodes""" from typing import List from xml.etree.ElementTree import Element from o2a.o2a_libs.src.o2a_lib import el_parser from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class HdfsPathProcessor: def __init__(self, props: PropertySet): self.props = props @staticmethod def check_path_for_comma(path: str) -> None: """ Raises exception if path has comma :param path: path to check :return: None """ if "," in path: raise Exception(f"There should not be ',' in the path {path}") def preprocess_path_to_hdfs(self, path: str): if path.startswith("/"): return self.props.merged["nameNode"] + path return self.props.merged["oozie.wf.application.path"] + "/" + path def split_by_hash_sign(path: str) -> List[str]: """ Checks if the path contains maximum one hash. :param path: path to check :return: path split into array on the hash """ if "#" in path: split_path = path.split("#") if len(split_path) > 2: raise Exception(f"There should be maximum one '#' in the path {path}") return split_path return [path] class FileExtractor: """ Extracts all file paths from an Oozie node """ def __init__(self, oozie_node: Element, props: PropertySet): self.files: List[str] = [] self.hdfs_files: List[str] = [] self.file_path_processor = HdfsPathProcessor(props=props) self.oozie_node = oozie_node self.props = props def parse_node(self): file_nodes: List[Element] = self.oozie_node.findall("file") for file_node in file_nodes: file_path = el_parser.translate(file_node.text) self.add_file(file_path) return self.files, self.hdfs_files def add_file(self, oozie_file_path: str) -> None: """ Adds file to the list of files for this action. :param oozie_file_path: oozie file path to add :return: None """ self.file_path_processor.check_path_for_comma(oozie_file_path) split_by_hash_sign(oozie_file_path) self.files.append(oozie_file_path) self.hdfs_files.append(self.file_path_processor.preprocess_path_to_hdfs(oozie_file_path)) class ArchiveExtractor: """ Extracts all archive paths from an Oozie node """ ALLOWED_EXTENSIONS = [".zip", ".gz", ".tar.gz", ".tar", ".jar"] def __init__(self, oozie_node: Element, props: PropertySet): self.archives: List[str] = [] self.hdfs_archives: List[str] = [] self.archive_path_processor = HdfsPathProcessor(props=props) self.oozie_node = oozie_node self.props = props def parse_node(self): archive_nodes: List[Element] = self.oozie_node.findall("archive") if archive_nodes: for archive_node in archive_nodes: archive_path = el_parser.translate(archive_node.text) self.add_archive(archive_path) return self.archives, self.hdfs_archives def _check_archive_extensions(self, oozie_archive_path: str) -> List[str]: """ Checks if the archive path is correct archive path. :param oozie_archive_path: path to check :return: path split on hash """ split_path = split_by_hash_sign(oozie_archive_path) archive_path = split_path[0] extension_accepted = False for extension in self.ALLOWED_EXTENSIONS: if archive_path.endswith(extension): extension_accepted = True if not extension_accepted: raise Exception( "The path {} cannot be accepted as archive as it does not have one " "of the extensions: {}".format(archive_path, self.ALLOWED_EXTENSIONS) ) return split_path def add_archive(self, oozie_archive_path: str): self.archive_path_processor.check_path_for_comma(oozie_archive_path) self._check_archive_extensions(oozie_archive_path) self.archives.append(oozie_archive_path) self.hdfs_archives.append(self.archive_path_processor.preprocess_path_to_hdfs(oozie_archive_path))