o2a/o2a.py (102 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Main entry point for the Oozie to Airflow converter""" import argparse import logging import os import sys # pylint: disable=no-name-in-module from distutils.spawn import find_executable from subprocess import CalledProcessError, check_call from o2a.converter.mappers import ACTION_MAP from o2a.converter.oozie_converter import OozieConverter from o2a.converter.constants import HDFS_FOLDER from o2a.converter.renderers import PythonRenderer, DotRenderer from o2a.transformers.add_node_notificaton_transformer import AddNodeNotificationTransformer from o2a.transformers.add_workflow_notificaton_transformer import AddWorkflowNotificationTransformer from o2a.transformers.remove_end_transformer import RemoveEndTransformer from o2a.transformers.remove_fork_transformer import RemoveForkTransformer from o2a.transformers.remove_inaccessible_node_transformer import RemoveInaccessibleNodeTransformer from o2a.transformers.remove_join_transformer import RemoveJoinTransformer from o2a.transformers.remove_kill_transformer import RemoveKillTransformer from o2a.transformers.remove_start_transformer import RemoveStartTransformer from o2a.utils.constants import CONFIG, WORKFLOW_XML INDENT = 4 PROJECT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)) def get_o2a_validate_workflows_script(): # If the o2a-validate-workflows script is present in the project or on the path # use it to validate the workflow validate_workflows_script = os.path.join(PROJECT_PATH, "bin", "o2a-validate-workflows") if not os.path.isfile(validate_workflows_script): validate_workflows_script = find_executable("o2a-validate-workflows") if not os.path.isfile(validate_workflows_script): logging.info(f"Skipping workflow validation as the {validate_workflows_script} is missing") return None logging.info(f"Found o2a-validate-workflows script at {validate_workflows_script}. Validating workflow") return validate_workflows_script # pylint: disable=missing-docstring def main(): args = parse_args(sys.argv[1:]) input_directory_path = args.input_directory_path output_directory_path = args.output_directory_path start_days_ago = args.start_days_ago schedule_interval = args.schedule_interval dag_name = args.dag_name if not dag_name: dag_name = os.path.basename(input_directory_path) conf_path = os.path.join(input_directory_path, CONFIG) if not os.path.isfile(conf_path): logging.warning( f""" #################################### WARNING ########################################### The '{CONFIG}' file was not detected in {input_directory_path}. It may be necessary to provide input parameters for the workflow. In case of any conversion errors make sure this configuration file is really not needed. Otherwise please provide it. ######################################################################################## """ ) validate_workflows_script = get_o2a_validate_workflows_script() if validate_workflows_script: try: check_call([validate_workflows_script, f"{input_directory_path}/{HDFS_FOLDER}/{WORKFLOW_XML}"]) except CalledProcessError: logging.error( "Workflow failed schema validation. " "Please correct the workflow XML and try again." ) exit(1) os.makedirs(output_directory_path, exist_ok=True) if args.dot: renderer_class = DotRenderer else: renderer_class = PythonRenderer renderer = renderer_class( output_directory_path=output_directory_path, schedule_interval=schedule_interval, start_days_ago=start_days_ago, ) transformers = [ RemoveInaccessibleNodeTransformer(), RemoveEndTransformer(), RemoveKillTransformer(), RemoveStartTransformer(), RemoveJoinTransformer(), RemoveForkTransformer(), AddWorkflowNotificationTransformer(), AddNodeNotificationTransformer(), ] converter = OozieConverter( dag_name=dag_name, input_directory_path=input_directory_path, output_directory_path=output_directory_path, action_mapper=ACTION_MAP, renderer=renderer, transformers=transformers, user=args.user, ) converter.recreate_output_directory() converter.convert() def parse_args(args): parser = argparse.ArgumentParser( description="Convert Apache Oozie workflows to Apache Airflow workflows." ) parser.add_argument("-i", "--input-directory-path", help="Path to input directory", required=True) parser.add_argument("-o", "--output-directory-path", help="Desired output directory", required=True) parser.add_argument("-n", "--dag-name", help="Desired DAG name [defaults to input directory name]") parser.add_argument( "-u", "--user", help="The user to be used in place of all " "${user.name} [defaults to user who ran the conversion]", ) parser.add_argument("-s", "--start-days-ago", help="Desired DAG start as number of days ago", default=0) parser.add_argument( "-v", "--schedule-interval", help="Desired DAG schedule interval as number of days", default=0 ) parser.add_argument("-d", "--dot", help="Renders workflow files in DOT format", action="store_true") return parser.parse_args(args)