def main()

in tools/cloud-composer-stress-testing/cloud-composer-workload-simulator/main.py [0:0]


def main(argv):
    """Reads configuration, generates DAGs, and writes them to files."""

    config_file = ""
    output_dir = ""
    upload = False

    try:
        opts, args = getopt.getopt(
            argv, "ho:v", ["help", "config-file=", "output-dir=", "upload-to-composer"]
        )
    except getopt.GetoptError:
        print(
            "main.py -config-file=<configfile> --output-dir=<outputdir> --upload-to-composer"
        )
        sys.exit(2)
    for opt, arg in opts:
        if opt == "-h":
            print(
                "main.py -config-file=<configfile> --output-dir=<outputdir> --upload-to-composer"
            )
            sys.exit()
        elif opt in ("--config-file"):
            config_file = arg
            print("-- Using config file:", config_file)
        elif opt in ("--output-dir"):
            output_dir = arg
            print("-- Generating output in:", output_dir)
        elif opt in ("--upload-to-composer"):
            upload = True
            print("-- Uploading generated dags to Composer environment.")

    # Load configuration
    load_config = helper_functions.load_config_from_file(config_file)

    validated = helper_functions.validate_config(load_config)

    if validated:
        num_dags = load_config["number_of_dags"]
        min_tasks_per_dag = load_config["min_tasks_per_dag"]

        # merge taskflow collections into single map of taskflows and weights
        taskflows = {}
        taskflow_collections = []
        for key in load_config["taskflows"]:
            taskflow_collections.append(key)
            nested_dict = load_config["taskflows"][key]
            taskflows.update(nested_dict)

        # Get paused weight configuration (default to 50/50 if not provided)
        paused_weight = load_config.get("paused", 0.5)

        # Generate DAGs
        for i in range(num_dags):
            experiment_id = load_config["experiment_id"]
            dag_id = f"{experiment_id}_dag_{i}".replace("-", "_")
            schedule = random.choices(
                list(load_config["schedules"].keys()),
                weights=list(load_config["schedules"].values()),
            )[0]
            start_date = random.choices(
                list(load_config["start_dates"].keys()),
                weights=list(load_config["start_dates"].values()),
            )[0]
            default_settings = load_config["default_settings"].copy()
            default_settings["owner"] = "airflow"

            # Determine if the DAG is paused based on the weight

            if default_settings["is_paused_upon_creation"]:
                is_paused = True
            else:
                is_paused = random.random() < paused_weight
            print(is_paused)

            dag = generate_dag_string(
                experiment_id=experiment_id,
                dag_id=dag_id,
                start_date=start_date,
                schedule=schedule,
                default_settings=default_settings,
                taskflow_collections=taskflow_collections,
                taskflows=taskflows,
                num_tasks=min_tasks_per_dag,
                is_paused=is_paused,
            )

            if not output_dir:
                output_dir = "dags/"

            Path(f"{output_dir}/{experiment_id}").mkdir(parents=True, exist_ok=True)
            with open(f"{output_dir}/{experiment_id}/dag_{i}.py", "w") as file:
                file.write(dag)

        # Upload DAGS to Composer Environment if specified.
        if upload:
            dag_folder = helper_functions.get_composer_environment_bucket(
                default_settings["project_id"],
                default_settings["region"],
                default_settings["composer_environment"],
            )
            helper_functions.upload_directory(
                source_folder=f"{output_dir}/{experiment_id}/",
                target_gcs_path=f"{dag_folder}/{experiment_id}",
            )

        print(
            f"> Generated {num_dags} dags with at least {min_tasks_per_dag} tasks per dag"
        )
        print(f"> Check dags/{experiment_id} directory for generated output")
        if upload:
            print(
                f"> Uploaded dags/{experiment_id} contents to {dag_folder}/{experiment_id}"
            )