def main()

in workflows-generator/orchestration_generator.py [0:0]


def main():
    """
    Main function for workflows generator

    :param workflow_file: Json definition file in the form of THREADS and STEPS
    :param config_file: Json parameters file
    :param output_file: Cloud Workflows generated file
    :param generate_for_pipeline: Boolean to identify if the run is part of a CICD pipeline
    :return: NA
    """
    encoding = "utf-8"
    workflow_file = sys.argv[1]
    with open(workflow_file, encoding=encoding) as json_file:
        workflow_config = json.load(json_file)
    if workflow_config.get("engine") == 'cloud_workflows':
        usage(4,'json')
    else:
        usage(4,'py')
    json_file_name = workflow_file.split("/")[-1].split(".")[0]
    config_file = sys.argv[2]
    output_file = sys.argv[3]
    generate_for_pipeline = bool(sys.argv[4])

    if generate_for_pipeline:
        with open(os.path.dirname(__file__) + '/' + config_file, encoding=encoding) as json_file:
            exec_config = json.load(json_file)
    else:
        with open(os.getcwd() + '/' + config_file, encoding=encoding) as json_file:
            exec_config = json.load(json_file)
    exec_config = process_config_key_values(exec_config)
    generator = None
    if workflow_config.get("engine") == 'cloud_workflows':
        workflow_config = workflow_config.get("definition")
        generator = WorkflowsGenerator(workflow_config, exec_config, generate_for_pipeline, config_file)
        generator.load_templates()
    elif workflow_config.get("engine") == 'composer':
        workflow_config = workflow_config.get("definition")
        generator = ComposerDagGenerator(workflow_config, exec_config,
                                         generate_for_pipeline, config_file, json_file_name)
        generator.load_templates()
    workflow_body = generator.generate_workflows_body()
    write_result(output_file, workflow_body)