in o2a/o2a.py [0:0]
def main():
args = parse_args(sys.argv[1:])
input_directory_path = args.input_directory_path
output_directory_path = args.output_directory_path
start_days_ago = args.start_days_ago
schedule_interval = args.schedule_interval
dag_name = args.dag_name
if not dag_name:
dag_name = os.path.basename(input_directory_path)
conf_path = os.path.join(input_directory_path, CONFIG)
if not os.path.isfile(conf_path):
logging.warning(
f"""
#################################### WARNING ###########################################
The '{CONFIG}' file was not detected in {input_directory_path}.
It may be necessary to provide input parameters for the workflow.
In case of any conversion errors make sure this configuration file is really not needed.
Otherwise please provide it.
########################################################################################
"""
)
validate_workflows_script = get_o2a_validate_workflows_script()
if validate_workflows_script:
try:
check_call([validate_workflows_script, f"{input_directory_path}/{HDFS_FOLDER}/{WORKFLOW_XML}"])
except CalledProcessError:
logging.error(
"Workflow failed schema validation. " "Please correct the workflow XML and try again."
)
exit(1)
os.makedirs(output_directory_path, exist_ok=True)
if args.dot:
renderer_class = DotRenderer
else:
renderer_class = PythonRenderer
renderer = renderer_class(
output_directory_path=output_directory_path,
schedule_interval=schedule_interval,
start_days_ago=start_days_ago,
)
transformers = [
RemoveInaccessibleNodeTransformer(),
RemoveEndTransformer(),
RemoveKillTransformer(),
RemoveStartTransformer(),
RemoveJoinTransformer(),
RemoveForkTransformer(),
AddWorkflowNotificationTransformer(),
AddNodeNotificationTransformer(),
]
converter = OozieConverter(
dag_name=dag_name,
input_directory_path=input_directory_path,
output_directory_path=output_directory_path,
action_mapper=ACTION_MAP,
renderer=renderer,
transformers=transformers,
user=args.user,
)
converter.recreate_output_directory()
converter.convert()