scripts/deploy_dag.py (426 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import json import pathlib import subprocess import typing import click from google.cloud.orchestration.airflow import service_v1beta1 from ruamel import yaml yaml = yaml.YAML(typ="safe") CURRENT_PATH = pathlib.Path(__file__).resolve().parent PROJECT_ROOT = CURRENT_PATH.parent DATASETS_PATH = PROJECT_ROOT / "datasets" DEFAULT_AIRFLOW_VERSION = 2 class IncompatibilityError(Exception): pass def main( env_path: pathlib.Path, dataset_id: str, composer_env: str, composer_bucket: typing.Union[str, None], composer_region: str, pipeline: typing.Union[str, None], ): if composer_bucket is None: composer_bucket = get_composer_bucket(composer_env, composer_region) print("\n========== AIRFLOW VARIABLES ==========") check_and_configure_airflow_variables( env_path, dataset_id, composer_env, composer_bucket, composer_region ) print("========== AIRFLOW DAGS ==========") if pipeline: pipelines = [env_path / "datasets" / dataset_id / "pipelines" / pipeline] else: pipelines = list_subdirs(env_path / "datasets" / dataset_id / "pipelines") runtime_airflow_version = composer_airflow_version(composer_env, composer_region) for pipeline_path in pipelines: check_airflow_version_compatibility(pipeline_path, runtime_airflow_version) data_folder = ( DATASETS_PATH / dataset_id / "pipelines" / pipeline_path.name / "data" ) if data_folder.exists() and data_folder.is_dir() and any(data_folder.iterdir()): copy_data_folder_to_composer_bucket( dataset_id, data_folder, pipeline_path.name, composer_bucket, ) copy_custom_callables_to_airflow_dags_folder( env_path, dataset_id, pipeline_path.name, composer_bucket, ) copy_generated_dag_to_airflow_dags_folder( env_path, dataset_id, pipeline_path.name, composer_bucket, ) def get_gcp_project() -> str: return subprocess.run( ["gcloud", "config", "get-value", "project"], text=True, capture_output=True ).stdout.strip() def get_composer_bucket( composer_env: str, composer_region: str, ) -> str: project_id = get_gcp_project() # Create a client client = service_v1beta1.EnvironmentsClient() # Initialize request argument(s) request = service_v1beta1.GetEnvironmentRequest( name=f"projects/{project_id}/locations/{composer_region}/environments/{composer_env}" ) # Make the request response = client.get_environment(request=request) # Handle the response composer_bucket = response.config.dag_gcs_prefix.replace("/dags", "").replace( "gs://", "" ) return composer_bucket def run_gsutil_cmd(args: typing.List[str], cwd: pathlib.Path): subprocess.check_call(["gsutil"] + args, cwd=cwd) def check_and_configure_airflow_variables( env_path: pathlib.Path, dataset_id: str, composer_env: str, composer_bucket: str, composer_region: str, ): """First checks if a `.vars.[ENV].yaml` file exists in the dataset folder and if the `pipelines` key exists in that file. If so, copy the JSON object equivalent of `pipelines` into the variables file at `.[ENV]/datasets/pipelines/[DATASET]_variables.json`. Finally, upload the pipeline variables file to the Composer bucket. """ cwd = env_path / "datasets" / dataset_id / "pipelines" vars_json_path = cwd / f"{dataset_id}_variables.json" env_vars_file = DATASETS_PATH / dataset_id / f".vars{env_path.name}.yaml" env_vars = yaml.load(open(env_vars_file)) if env_vars_file.exists() else None if isinstance(env_vars, dict) and "pipelines" in env_vars: local_vars = env_vars["pipelines"] elif vars_json_path.exists() and vars_json_path.stat().st_size > 0: with open(vars_json_path) as file_: local_vars = json.load(file_) else: print("No local pipeline variables found.") local_vars = None overwrite_remote_vars = compare_and_set_airflow_variables( local_vars, composer_env, composer_region, dataset_id, vars_json_path, ) if overwrite_remote_vars: import_variables_to_cloud_composer( env_path, dataset_id, composer_env, composer_bucket, composer_region ) def get_airflow_var_from_composer_env( composer_env: str, composer_region: str, dataset_id: str, ) -> typing.Union[dict, None]: result = subprocess.run( [ "gcloud", "composer", "environments", "run", composer_env, "--location", composer_region, "--project", get_gcp_project(), "variables", "--", "get", dataset_id, ], text=True, capture_output=True, ) # The variable doesn't exist in the Composer environment if result.returncode == 1: print( f"Airflow variable `{dataset_id}` not found in Composer environment `{composer_env}`" ) return else: print( f"Airflow variable `{dataset_id}` found in Composer environment `{composer_env}`" ) return {dataset_id: json.loads(result.stdout.strip())} def compare_and_set_airflow_variables( local_vars: typing.Union[dict, None], composer_env: str, composer_region: str, dataset_id: str, vars_json_path: pathlib.Path, ) -> bool: if not local_vars: print( f"Airflow variable `{dataset_id}` is not defined locally. Checking Cloud Composer environment for this variable.." ) remote_vars = get_airflow_var_from_composer_env( composer_env, composer_region, dataset_id ) if remote_vars is None and local_vars is None: print( "Airflow variables not defined locally and remotely. Cloud Composer variable import will be skipped.\n" ) vars_to_use = None import_to_composer = False if remote_vars is not None and local_vars is not None: print( "Remote value:\n" f"{json.dumps(remote_vars, indent=2)}\n\n" "Local value:\n" f"{json.dumps(local_vars, indent=2)}\n" ) if remote_vars == local_vars: print( "Remote and local Airflow variables are the same. Cloud Composer variable import will be skipped.\n" ) vars_to_use = local_vars import_to_composer = False else: strategy = prompt_strategy_for_local_and_remote_vars() if strategy.lower() == "r": # use remote variable (default) vars_to_use = remote_vars import_to_composer = False elif strategy.lower() == "l": # use local variable vars_to_use = local_vars import_to_composer = True else: # merge local and remote variables vars_to_use = merge_nested_dicts(remote_vars, local_vars) import_to_composer = True print( f"Airflow variable `{dataset_id}` is now set to\n" f"{json.dumps(vars_to_use, indent=2)}\n" ) elif remote_vars is None and local_vars is not None: vars_to_use = local_vars import_to_composer = True else: # remote vars exists and local vars is None print( f"Setting local variable `{dataset_id}` to\n" f"{json.dumps(remote_vars, indent=2)}\n\n" ) vars_to_use = remote_vars import_to_composer = False if vars_to_use is not None: with open(vars_json_path, "w") as file_: file_.write(json.dumps(vars_to_use)) return import_to_composer def prompt_strategy_for_local_and_remote_vars() -> str: strategy = click.prompt( ( "Remote and local Airflow variables are different.\n" "Select version to use: Merge (m), use local (l), use remote (r)?" ), type=click.Choice(["m", "l", "r"], case_sensitive=False), default="r", ) return strategy def merge_nested_dicts(a: dict, b: dict, path=None) -> dict: if path is None: path = [] for key in b: if key in a: if isinstance(a[key], dict) and isinstance(b[key], dict): merge_nested_dicts(a[key], b[key], path + [str(key)]) elif a[key] == b[key]: pass # same leaf value else: a[key] = b[key] else: a[key] = b[key] return a def copy_data_folder_to_composer_bucket( dataset_id: str, data_folder: pathlib.Path, pipeline: str, composer_bucket: str, ): print( f"Data folder exists: {data_folder}.\nCopying contents into Composer bucket.." ) gcs_uri = f"gs://{composer_bucket}/data/{dataset_id}/{pipeline}" run_gsutil_cmd(["-q", "cp", "-r", f"{data_folder}/*", gcs_uri], data_folder) print("Done. Files uploaded to GCS:") for file in data_folder.iterdir(): print(f" - {gcs_uri}/{file.name}") def run_cloud_composer_vars_import( composer_env: str, composer_region: str, airflow_path: str, cwd: pathlib.Path, ): subprocess.check_call( [ "gcloud", "beta", "composer", "environments", "run", composer_env, "--location", composer_region, "variables", "--", "import", airflow_path, ], cwd=cwd, ) def import_variables_to_cloud_composer( env_path: pathlib.Path, dataset_id: str, composer_env: str, composer_bucket: str, composer_region: str, ): """ gcloud composer environments run COMPOSER_ENV --location COMPOSER_REGION variables -- import /home/airflow/gcs/data/variables/{DATASET_ID}_variables.json """ cwd = env_path / "datasets" / dataset_id / "pipelines" filename = f"{dataset_id}_variables.json" gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}" airflow_path = f"/home/airflow/gcs/data/variables/{filename}" print( "\nCopying variables JSON file into Cloud Composer data folder\n\n" f" Source:\n {cwd / filename}\n\n" f" Destination:\n {gcs_uri}\n" ) run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd) print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n") run_cloud_composer_vars_import(composer_env, composer_region, airflow_path, cwd=cwd) def copy_generated_dag_to_airflow_dags_folder( env_path: pathlib.Path, dataset_id: str, pipeline_id: str, composer_bucket: str, ): """ Runs the command gsutil cp {PIPELINE_ID}_dag.py gs://{COMPOSER_BUCKET}/dags/{DATASET_ID}__{PIPELINE_ID}_dag.py inside $DATASET/pipelines/$PIPELINE """ cwd = env_path / "datasets" / dataset_id / "pipelines" / pipeline_id filename = f"{pipeline_id}_dag.py" target = f"gs://{composer_bucket}/dags/{dataset_id}__{pipeline_id}_dag.py" print( f"\nCopying DAG file for pipeline `{pipeline_id}` into Cloud Composer DAG folder\n\n" f" Source:\n {cwd / filename}\n\n" f" Destination:\n {target}\n" ) run_gsutil_cmd(["cp", filename, target], cwd=cwd) def copy_custom_callables_to_airflow_dags_folder( env_path: pathlib.Path, dataset_id: str, pipeline_id: str, composer_bucket: str, ): """ Runs the command gsutil cp -r custom gs://$COMPOSER_BUCKET/dags/$DATASET/$PIPELINE_ID/ inside $DATASET/pipelines/$PIPELINE. """ cwd = env_path / "datasets" / dataset_id / "pipelines" / pipeline_id if not (cwd / "custom").exists(): return target = f"gs://{composer_bucket}/dags/{dataset_id}/{pipeline_id}/" print( f"\nCopying custom callables folder for pipeline `{pipeline_id}` into Cloud Composer DAG folder\n\n" f" Source:\n {cwd / 'custom'}\n\n" f" Destination:\n {target}\n" ) run_gsutil_cmd(["-m", "cp", "-r", "custom", target], cwd=cwd) def check_existence_of_variables_file(file_path: pathlib.Path): if not file_path: raise FileNotFoundError(f"Airflow variables file {file_path} does not exist.") def list_subdirs(path: pathlib.Path) -> typing.List[pathlib.Path]: """Returns a list of subdirectories""" subdirs = [f for f in path.iterdir() if f.is_dir() and not f.name[0] in (".", "_")] return subdirs def composer_airflow_version( composer_env: str, composer_region: str ) -> typing.Literal[1, 2]: config = json.loads( subprocess.run( [ "gcloud", "composer", "environments", "describe", composer_env, "--location", composer_region, "--format", "json", ], stdout=subprocess.PIPE, ).stdout.decode("utf-8") ) # Example image version: composer-1.17.0-preview.8-airflow-2.1.1 image_version = config["config"]["softwareConfig"]["imageVersion"] airflow_version = image_version.split("-airflow-")[-1] return 2 if airflow_version.startswith("2") else 1 def get_dag_airflow_version(config: dict) -> int: return config["dag"].get("airflow_version", DEFAULT_AIRFLOW_VERSION) def check_airflow_version_compatibility( pipeline_path: pathlib.Path, runtime_airflow_version: int ) -> None: """If a DAG uses Airflow 2 operators but the runtime version uses Airflow 1, raise a compatibility error. On the other hand, DAGs using Airflow 1.x operators can still run in an Airflow 2 runtime environment via backport providers. """ dag_airflow_version = get_dag_airflow_version( yaml.load((pipeline_path / "pipeline.yaml").read_text()) ) if dag_airflow_version > runtime_airflow_version: raise IncompatibilityError( f"The DAG {pipeline_path.name} uses Airflow 2, but" " you are deploying to an Airflow 1.x environment." ) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Deploy DAGs and variables to an Airflow environment" ) parser.add_argument( "-d", "--dataset", required=True, type=str, dest="dataset", help="The directory name of the dataset.", ) parser.add_argument( "-e", "--env", type=str, default="dev", dest="env", help="The stage used for the resources: dev|staging|prod", ) parser.add_argument( "-n", "--composer-env", required=True, type=str, dest="composer_env", help="The Google Cloud Composer environment name", ) parser.add_argument( "-b", "--composer-bucket", required=False, type=str, dest="composer_bucket", help="The Google Cloud Composer bucket name", ) parser.add_argument( "-r", "--composer-region", required=True, type=str, dest="composer_region", help="The region of the Google Cloud Composer environment", ) parser.add_argument( "-p", "--pipeline", required=False, type=str, dest="pipeline", help="The directory name of the pipeline", ) args = parser.parse_args() if not args.composer_env: raise ValueError( "Argument `-n|--composer-env` (Composer environment name) not specified" ) if not args.composer_region: raise ValueError( "Argument `-r|--composer-region` (Composer environment region) not specified" ) main( env_path=PROJECT_ROOT / f".{args.env}", dataset_id=args.dataset, pipeline=args.pipeline, composer_env=args.composer_env, composer_bucket=args.composer_bucket, composer_region=args.composer_region, )