tools/cloud-composer-stress-testing/cloud-composer-workload-simulator/local_utils/helper_functions.py (95 lines of code) (raw):

""" """ import subprocess import yaml from cerberus import Validator from google.cloud.orchestration.airflow import service_v1 as composer def load_config_from_file(filepath): """ Load YAML file into dictionary. """ load_config = {} try: with open(filepath, "r") as f: load_config = yaml.safe_load(f) except FileNotFoundError: print("Error: config.yaml not found.") return load_config def get_composer_environment_bucket(project_id, location, environment_name): """Gets the GCS bucket associated with a Cloud Composer environment. Args: project_id: The ID of the Google Cloud project that the service belongs to. location: The ID of the Google Cloud region that the service belongs to. environment_name: The name of the Cloud Composer environment. Returns: The GCS bucket associated with the Cloud Composer environment. """ client = composer.EnvironmentsClient() name = f"projects/{project_id}/locations/{location}/environments/{environment_name}" response = client.get_environment(name=name) return response.config.dag_gcs_prefix def upload_directory(source_folder, target_gcs_path): """Uploads a directory to GCS using gsutil with streaming output. Args: bucket_name (str): The name of the GCS bucket. source_folder (str): The path to the local directory to upload. target_prefix (str, optional): The prefix/folder in the GCS bucket where files should be uploaded. Defaults to None (root of the bucket). """ command = ["gsutil", "-m", "cp", "-r", source_folder, target_gcs_path] process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, ) for line in process.stdout: print(line, end="") # Print   each line of gsutil's output as it comes process.stdout.close() return_code = process.wait() if return_code: raise subprocess.CalledProcessError(return_code, command) def validate_config(config): """ Validates the config against a predefined schema. Args: config: The workload generation config Returns: True if the config is valid, False otherwise. Also prints any validation errors encountered. """ # Define the schema for validation schema = { "experiment_id": { "type": "string", "minlength": 1, "maxlength": 50, "required": True, }, "number_of_dags": {"type": "integer", "min": 1, "required": True}, "min_tasks_per_dag": {"type": "integer", "min": 1, "required": True}, "schedules": {"type": "dict", "schema": {}, "required": True}, "start_dates": {"type": "dict", "schema": {}, "required": True}, "taskflows": { "type": "dict", "schema": { "base": {"type": "dict", "schema": {}}, "google_cloud": {"type": "dict", "schema": {}}, }, "required": True, }, "default_settings": { "type": "dict", "schema": { "project_id": { "type": "string", "minlength": 1, "maxlength": 100, "required": True, }, "region": { "type": "string", "minlength": 1, "maxlength": 50, "required": True, }, "composer_environment": { "type": "string", "minlength": 1, "maxlength": 100, "required": True, }, "deferrable": {"type": "boolean"}, "retries": {"type": "integer", "min": 0}, "retry_delay": {"type": "integer", "min": 0}, "catchup": {"type": "boolean"}, "is_paused_upon_creation": {"type": "boolean"}, "dagrun_timeout": {"type": "integer", "min": 0}, "execution_timeout": {"type": "integer", "min": 0}, "sla": {"type": "integer", "min": 0}, "mode": {"type": "string", "minlength": 1, "maxlength": 20}, "poke_interval": {"type": "integer", "min": 0}, }, "required": True, }, } # Validate the YAML against the schema v = Validator(schema) v.allow_unknown = True if v.validate(config): return True else: print("Validation errors:") for field, error in v.errors.items(): print(f"- {field}: {error}") return False