o2a/mappers/java_mapper.py (88 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Maps Java action into Airflow's DAG""" from typing import List, Optional, Set from xml.etree.ElementTree import Element from o2a.converter.constants import LIB_FOLDER from o2a.converter.relation import Relation from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils import xml_utils from o2a.utils.file_archive_extractors import FileExtractor, ArchiveExtractor from o2a.utils.xml_utils import get_tags_el_array_from_text TAG_MAIN_CLASS = "main-class" TAG_JAVA_OPTS = "java-opts" TAG_JAVA_OPT = "java-opt" TAG_ARG = "arg" class JavaMapper(ActionMapper): """ Converts a Java Oozie action node to an Airflow task. """ def __init__( self, oozie_node: Element, name: str, dag_name: str, props: PropertySet, jar_files: List[str], **kwargs, ): ActionMapper.__init__( self, oozie_node=oozie_node, dag_name=dag_name, name=name, props=props, **kwargs ) self.file_extractor = FileExtractor(oozie_node=oozie_node, props=self.props) self.archive_extractor = ArchiveExtractor(oozie_node=oozie_node, props=self.props) self.main_class: Optional[str] = None self.java_opts: List[str] = [] self.args: Optional[List[str]] = None self.hdfs_files: Optional[List[str]] = None self.hdfs_archives: Optional[List[str]] = None self.prepare_extension: PrepareMapperExtension = PrepareMapperExtension(self) self.jar_files: List[str] = jar_files if jar_files else [] self.jar_files_in_hdfs: List[str] = [] self._get_jar_files_in_hdfs_full_paths() def on_parse_node(self): super().on_parse_node() _, self.hdfs_files = self.file_extractor.parse_node() _, self.hdfs_archives = self.archive_extractor.parse_node() self._extract_java_data() def to_tasks_and_relations(self): action_task = Task( task_id=self.name, template_name="java.tpl", template_params=dict( props=self.props, hadoop_job=dict( args=self.args, jar_file_uris=self.jar_files_in_hdfs, file_uris=self.hdfs_files, archive_uris=self.hdfs_archives, main_class=self.main_class, ), ), ) tasks = [action_task] relations: List[Relation] = [] prepare_task = self.prepare_extension.get_prepare_task() if prepare_task: tasks, relations = self.prepend_task(prepare_task, tasks, relations) return tasks, relations def required_imports(self) -> Set[str]: return { "from airflow.utils import dates", "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator" } def _get_jar_files_in_hdfs_full_paths(self): hdfs_app_prefix = self.props.job_properties["oozie.wf.application.path"] for file in self.jar_files: self.jar_files_in_hdfs.append(hdfs_app_prefix + "/" + LIB_FOLDER + "/" + file) def _extract_java_data(self): """Extracts Java node data.""" root = self.oozie_node props = self.props if "mapred.child.java.opts" in props.merged: self.java_opts.extend(props.merged["mapred.child.java.opts"].split(" ")) if "mapreduce.map.java.opts" in props.merged: self.java_opts.extend(props.merged["mapreduce.map.java.opts"].split(" ")) self.main_class = xml_utils.get_tag_el_text(root=root, tag=TAG_MAIN_CLASS) java_opts_string = xml_utils.get_tag_el_text(root=root, tag=TAG_JAVA_OPTS) if java_opts_string: self.java_opts.extend(java_opts_string.split(" ")) else: self.java_opts.extend(get_tags_el_array_from_text(root=root, tag=TAG_JAVA_OPT)) self.args = get_tags_el_array_from_text(root=root, tag=TAG_ARG)