in tools/cloud-composer-stress-testing/cloud-composer-workload-simulator/main.py [0:0]
def main(argv):
"""Reads configuration, generates DAGs, and writes them to files."""
config_file = ""
output_dir = ""
upload = False
try:
opts, args = getopt.getopt(
argv, "ho:v", ["help", "config-file=", "output-dir=", "upload-to-composer"]
)
except getopt.GetoptError:
print(
"main.py -config-file=<configfile> --output-dir=<outputdir> --upload-to-composer"
)
sys.exit(2)
for opt, arg in opts:
if opt == "-h":
print(
"main.py -config-file=<configfile> --output-dir=<outputdir> --upload-to-composer"
)
sys.exit()
elif opt in ("--config-file"):
config_file = arg
print("-- Using config file:", config_file)
elif opt in ("--output-dir"):
output_dir = arg
print("-- Generating output in:", output_dir)
elif opt in ("--upload-to-composer"):
upload = True
print("-- Uploading generated dags to Composer environment.")
# Load configuration
load_config = helper_functions.load_config_from_file(config_file)
validated = helper_functions.validate_config(load_config)
if validated:
num_dags = load_config["number_of_dags"]
min_tasks_per_dag = load_config["min_tasks_per_dag"]
# merge taskflow collections into single map of taskflows and weights
taskflows = {}
taskflow_collections = []
for key in load_config["taskflows"]:
taskflow_collections.append(key)
nested_dict = load_config["taskflows"][key]
taskflows.update(nested_dict)
# Get paused weight configuration (default to 50/50 if not provided)
paused_weight = load_config.get("paused", 0.5)
# Generate DAGs
for i in range(num_dags):
experiment_id = load_config["experiment_id"]
dag_id = f"{experiment_id}_dag_{i}".replace("-", "_")
schedule = random.choices(
list(load_config["schedules"].keys()),
weights=list(load_config["schedules"].values()),
)[0]
start_date = random.choices(
list(load_config["start_dates"].keys()),
weights=list(load_config["start_dates"].values()),
)[0]
default_settings = load_config["default_settings"].copy()
default_settings["owner"] = "airflow"
# Determine if the DAG is paused based on the weight
if default_settings["is_paused_upon_creation"]:
is_paused = True
else:
is_paused = random.random() < paused_weight
print(is_paused)
dag = generate_dag_string(
experiment_id=experiment_id,
dag_id=dag_id,
start_date=start_date,
schedule=schedule,
default_settings=default_settings,
taskflow_collections=taskflow_collections,
taskflows=taskflows,
num_tasks=min_tasks_per_dag,
is_paused=is_paused,
)
if not output_dir:
output_dir = "dags/"
Path(f"{output_dir}/{experiment_id}").mkdir(parents=True, exist_ok=True)
with open(f"{output_dir}/{experiment_id}/dag_{i}.py", "w") as file:
file.write(dag)
# Upload DAGS to Composer Environment if specified.
if upload:
dag_folder = helper_functions.get_composer_environment_bucket(
default_settings["project_id"],
default_settings["region"],
default_settings["composer_environment"],
)
helper_functions.upload_directory(
source_folder=f"{output_dir}/{experiment_id}/",
target_gcs_path=f"{dag_folder}/{experiment_id}",
)
print(
f"> Generated {num_dags} dags with at least {min_tasks_per_dag} tasks per dag"
)
print(f"> Check dags/{experiment_id} directory for generated output")
if upload:
print(
f"> Uploaded dags/{experiment_id} contents to {dag_folder}/{experiment_id}"
)