tools/cloud-composer-migration-complexity-assessment/airflow-v1-to-v2-migration/run_mig.py (69 lines of code) (raw):

import argparse import os import logging import sys import traceback import airflow_migration.migration as migration def convert_to_bool(arg): ua = str(arg).upper() if 'TRUE'.startswith(ua): return True elif 'FALSE'.startswith(ua): return False else: return False def validate_folder_access(input_folder): if os.access(input_folder, os.F_OK) and os.access(input_folder, os.R_OK) and os.access(input_folder, os.W_OK): return True else: return False # Main class to check the inputs for rw_access and location, then call the migration clss def main(input_dag, output_dag, rules_file, add_comments, comments, report_generation): try: invalid_folders = [folder for folder in [input_dag, output_dag, rules_file] if not validate_folder_access(folder)] if invalid_folders: invalid_folder_names = ','.join(invalid_folders) msg = f"The following folder may have invalid permissions or may not exist: {invalid_folder_names}" logging.error(msg) else: add_comments = convert_to_bool(add_comments) report_generation = convert_to_bool(report_generation) migration.run_migration(input_dag, output_dag, rules_file, add_comments, comments, report_generation) except: logging.error("Oops! Ran into an error. Kindly check the trace log") traceback.print_exception(*sys.exc_info()) if __name__ == '__main__': # Set up a handler for printing INFO logs to the console console = logging.StreamHandler() console.setLevel(logging.INFO) formatter = logging.Formatter('%(levelname)-8s %(message)s') console.setFormatter(formatter) logging.getLogger('').addHandler(console) parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument( '--input_dag_folder', dest='input_dag_folder', required=True, help='[REQUIRED]The path to the input DAG folder location' ) parser.add_argument( '--output_dag_folder', dest='output_dag_folder', required=True, help='[REQUIRED]The path to the output DAG folder location' ) parser.add_argument( '--rules_file', dest='rules_file', help='[OPTIONAL]The path to optional rules.csv folder when custom rules are to be used', default="./migration_rules/rules.csv" ) parser.add_argument( '--add_comments', dest='add_comments', help='[OPTIONAL]If client wants to see Migration Utility generated comments in the output files', default="True") parser.add_argument( '--comments', dest='comment', help='[OPTIONAL]The path to optional rules.csv folder when custom rules are to be used', default="") parser.add_argument( '--report_req', dest='report_generation', help='[OPTIONAL]True or False to determine the generation of final output report', default=True) args = parser.parse_args() main(args.input_dag_folder, args.output_dag_folder, args.rules_file, args.add_comments, args.comment, args.report_generation)