def parse_args()

in client/migrationx-transformer/src/main/python/airflow_dag_parser/parser.py [0:0]


def parse_args():
    global dag_folder, output
    parser = optparse.OptionParser(
        "airflow-workflow", description="converting Airflow dag to DataWorks workflow", version="1.0")
    parser.add_option("-d", "--dag_folder", action="store", dest="dag_folder", help="airflow dag folder")
    parser.add_option("-o", "--output", action="store", dest="output", help="workflow storing folder")
    parser.add_option("-c", "--connections", action="store", dest="connections", help="connection csv file")
    parser.add_option("-p", "--prefix", action="store", dest="prefix", help="workflow location prefix in IDE")
    parser.add_option("-m", "--mapping", action="store", dest="mapping",
                      help='type mapping json file, e.g. /path/to/conf/FUTU_flowspec-airflowV2-transformer-config.json')
    parser.add_option("-x", "--modules", action="store", dest="modules", help="modules to dump out")
    opts, args = parser.parse_args(sys.argv)
    if not opts.dag_folder:
        logger.error("dag_folder not set")
        sys.exit(-1)
    dag_folder = opts.dag_folder
    if not opts.output:
        logger.error("output folder not set")
        sys.exit(-2)
    if not os.path.exists(opts.output):
        os.mkdir(opts.output)
    output = opts.output
    # if opts.connections:
    #     os.environ['connections_csv'] = connections
    customer_config = None
    if opts.mapping:
        logger.info(f"Overwriting type mapping with file: {opts.mapping}")
        with open(opts.mapping) as fd:
            customer_config = json.load(fd)
        if customer_config is not None and 'typeMapping' in customer_config:
            customized_mapping = customer_config['typeMapping']
            Config.typeMapping.update(customized_mapping)
        str_content = list()
        for k, v in Config.typeMapping.items():
            str_content.append(f"\t{k}: {v}\n")
        logger.info(f"Effective type mappings are: \n{''.join(str_content)}")
    if customer_config is not None and 'workflowPathPrefix' in customer_config:
        cc_prefix = customer_config['workflowPathPrefix']
        logger.info(f"Also setting workflow path prefix to {cc_prefix}")
        Config.workflowPathPrefix = cc_prefix
    if customer_config is not None and 'settings' in customer_config:
        cc_settings = customer_config['settings']
        logger.info(f"Also overwriting settings with {cc_settings}")
        Config.settings.update(cc_settings)
    if opts.prefix:
        logger.info(f"Setting workflow path prefix to {opts.prefix}")
        Config.workflowPathPrefix = opts.prefix
    logger.info(f"Effective workflow path prefix is: {Config.workflowPathPrefix}")
    try:
        dump_modules = []
        if opts.modules:
            dump_modules = opts.modules.split(',')
        for mod in dump_modules:
            dump_python_path(mod)
            print('\n')
    except Exception as e:
        logger.error(e)
    return dag_folder, output