in client/migrationx-transformer/src/main/python/airflow_dag_parser/parser.py [0:0]
def parse_args():
global dag_folder, output
parser = optparse.OptionParser(
"airflow-workflow", description="converting Airflow dag to DataWorks workflow", version="1.0")
parser.add_option("-d", "--dag_folder", action="store", dest="dag_folder", help="airflow dag folder")
parser.add_option("-o", "--output", action="store", dest="output", help="workflow storing folder")
parser.add_option("-c", "--connections", action="store", dest="connections", help="connection csv file")
parser.add_option("-p", "--prefix", action="store", dest="prefix", help="workflow location prefix in IDE")
parser.add_option("-m", "--mapping", action="store", dest="mapping",
help='type mapping json file, e.g. /path/to/conf/FUTU_flowspec-airflowV2-transformer-config.json')
parser.add_option("-x", "--modules", action="store", dest="modules", help="modules to dump out")
opts, args = parser.parse_args(sys.argv)
if not opts.dag_folder:
logger.error("dag_folder not set")
sys.exit(-1)
dag_folder = opts.dag_folder
if not opts.output:
logger.error("output folder not set")
sys.exit(-2)
if not os.path.exists(opts.output):
os.mkdir(opts.output)
output = opts.output
# if opts.connections:
# os.environ['connections_csv'] = connections
customer_config = None
if opts.mapping:
logger.info(f"Overwriting type mapping with file: {opts.mapping}")
with open(opts.mapping) as fd:
customer_config = json.load(fd)
if customer_config is not None and 'typeMapping' in customer_config:
customized_mapping = customer_config['typeMapping']
Config.typeMapping.update(customized_mapping)
str_content = list()
for k, v in Config.typeMapping.items():
str_content.append(f"\t{k}: {v}\n")
logger.info(f"Effective type mappings are: \n{''.join(str_content)}")
if customer_config is not None and 'workflowPathPrefix' in customer_config:
cc_prefix = customer_config['workflowPathPrefix']
logger.info(f"Also setting workflow path prefix to {cc_prefix}")
Config.workflowPathPrefix = cc_prefix
if customer_config is not None and 'settings' in customer_config:
cc_settings = customer_config['settings']
logger.info(f"Also overwriting settings with {cc_settings}")
Config.settings.update(cc_settings)
if opts.prefix:
logger.info(f"Setting workflow path prefix to {opts.prefix}")
Config.workflowPathPrefix = opts.prefix
logger.info(f"Effective workflow path prefix is: {Config.workflowPathPrefix}")
try:
dump_modules = []
if opts.modules:
dump_modules = opts.modules.split(',')
for mod in dump_modules:
dump_python_path(mod)
print('\n')
except Exception as e:
logger.error(e)
return dag_folder, output