cloud-composer/dags/sample-dataplex-deploy.py (172 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Install Terraform and executes the Terraform script # This DAG will perform the Deploy and the Destroy # Terraform will deploy this file twice (once named xxx-deploy and xxx-destroy) # The destroy will run every 15 minutes and based upon the auto_delete_hours the assets will be deleted # [START dag] from datetime import datetime, timedelta import os import json import airflow from airflow.operators import bash_operator from airflow.operators.python_operator import PythonOperator #################################################################################### # Set these values (the DAG name and the auto-delete time) #################################################################################### airflow_data_path_to_tf_script = "/home/airflow/gcs/data/terraform/dataplex" dag_prefix_name = "sample-dataplex" auto_delete_hours = 120 # 5 days # set to zero to never delete terraform_bash_file = "sample_terraform_dataplex.sh" # Required for deployment project_id = os.environ['ENV_PROJECT_ID'] impersonate_service_account = os.environ['ENV_TERRAFORM_SERVICE_ACCOUNT'] # Parameters to Terraform dataplex_region = os.environ['ENV_DATAPLEX_REGION'] raw_bucket_name = os.environ['ENV_RAW_BUCKET'] processed_bucket_name = os.environ['ENV_PROCESSED_BUCKET'] taxi_dataset_id = os.environ['ENV_TAXI_DATASET_ID'] thelook_dataset_id = os.environ['ENV_THELOOK_DATASET_ID'] random_extension = os.environ['ENV_RANDOM_EXTENSION'] rideshare_raw_bucket = os.environ['ENV_RIDESHARE_LAKEHOUSE_RAW_BUCKET'] rideshare_enriched_bucket = os.environ['ENV_RIDESHARE_LAKEHOUSE_ENRICHED_BUCKET'] rideshare_curated_bucket = os.environ['ENV_RIDESHARE_LAKEHOUSE_CURATED_BUCKET'] rideshare_raw_dataset = os.environ['ENV_RIDESHARE_LAKEHOUSE_RAW_DATASET'] rideshare_enriched_dataset = os.environ['ENV_RIDESHARE_LAKEHOUSE_ENRICHED_DATASET'] rideshare_curated_dataset = os.environ['ENV_RIDESHARE_LAKEHOUSE_CURATED_DATASET'] rideshare_llm_raw_dataset = os.environ['ENV_RIDESHARE_LLM_RAW_DATASET'] rideshare_llm_enriched_dataset = os.environ['ENV_RIDESHARE_LLM_ENRICHED_DATASET'] rideshare_llm_curated_dataset = os.environ['ENV_RIDESHARE_LLM_CURATED_DATASET'] #################################################################################### # Set if we are deploy or a destroy script based on the Python file name (suffix) # Based upon the filename the "terraform destroy" will be invoked # This means the sample-xxx-deploy and sample-xxx-destroy contain the SAME EXACT code #################################################################################### dag_display_name = "" terraform_destroy = "" is_deploy_or_destroy = "" terraform_bash_script_deploy = "" terraform_bash_script_destroy = "" env_run_bash_deploy="" print("os.path.basename(__file__):", os.path.basename(__file__)) if "destroy" in os.path.basename(__file__): env_run_bash_deploy="false" terraform_bash_script_deploy = "echo 'Skiping Deploy'" terraform_bash_script_destroy = terraform_bash_file is_deploy_or_destroy = "destroy" dag_display_name = dag_prefix_name + "-destroy" terraform_destroy = "-destroy" # parameter to terraform script schedule_interval=timedelta(minutes=15) else: env_run_bash_deploy="true" terraform_bash_script_deploy = terraform_bash_file terraform_bash_script_destroy = "echo 'Skiping Destroy'" is_deploy_or_destroy = "deploy" dag_display_name = dag_prefix_name + "-deploy" schedule_interval=None params_list = { 'airflow_data_path_to_tf_script' : airflow_data_path_to_tf_script, 'project_id' : project_id, 'impersonate_service_account' : impersonate_service_account, 'terraform_destroy' : terraform_destroy, 'dataplex_region' : dataplex_region, 'raw_bucket_name' : raw_bucket_name, 'processed_bucket_name' : processed_bucket_name, 'taxi_dataset_id' : taxi_dataset_id, 'thelook_dataset_id' : thelook_dataset_id, 'random_extension' : random_extension, 'rideshare_raw_bucket' : rideshare_raw_bucket, 'rideshare_enriched_bucket' : rideshare_enriched_bucket, 'rideshare_curated_bucket' : rideshare_curated_bucket, 'rideshare_raw_dataset' : rideshare_raw_dataset, 'rideshare_enriched_dataset' : rideshare_enriched_dataset, 'rideshare_curated_dataset' : rideshare_curated_dataset, 'rideshare_llm_raw_dataset' : rideshare_llm_raw_dataset, 'rideshare_llm_enriched_dataset' : rideshare_llm_enriched_dataset, 'rideshare_llm_curated_dataset' : rideshare_llm_curated_dataset } #################################################################################### # Common items #################################################################################### default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } # Write out a file that will log when we deployed # This file can then be used for auto-delete def write_deployment_file(deploy_or_destroy): print("BEGIN: write_deployment_file") if deploy_or_destroy == "deploy": run_datetime = datetime.now() data = { "deployment_datetime" : run_datetime.strftime("%m/%d/%Y %H:%M:%S"), } with open('/home/airflow/gcs/data/' + dag_prefix_name + '.json', 'w') as f: json.dump(data, f) print("data: ", data) else: print("write_deployment_file is skipped since this DAG is not a deployment DAG.") print("END: write_deployment_file") # Determine if it is tiem to delete the environment if necessary def delete_environment(deploy_or_destroy): print("BEGIN: delete_environment") delete_environment = False if deploy_or_destroy == "destroy": filePath = '/home/airflow/gcs/data/' + dag_prefix_name + '.json' if os.path.exists(filePath): with open(filePath) as f: data = json.load(f) print("deployment_datetime: ", data['deployment_datetime']) deployment_datetime = datetime.strptime(data['deployment_datetime'], "%m/%d/%Y %H:%M:%S") difference = deployment_datetime - datetime.now() print("difference.total_seconds(): ", abs(difference.total_seconds())) # Test for auto_delete_hours hours if auto_delete_hours == 0: print("No auto delete set auto_delete_hours:",auto_delete_hours) else: if abs(difference.total_seconds()) > (auto_delete_hours * 60 * 60): print("Deleting Environment >", auto_delete_hours, " hours") delete_environment = True else: print("Json files does not exist (no environment deployed)") else: print("delete_environment is skipped since this DAG is not a destroy DAG.") if delete_environment: return "true" else: return "false" # Removes the deployment file so we do not keep re-deleting def delete_deployment_file(delete_environment): print("BEGIN: delete_deployment_file") print("delete_environment:",delete_environment) if delete_environment == "true": print("Deleting file:", '/home/airflow/gcs/data/' + dag_prefix_name + '.json') os.remove('/home/airflow/gcs/data/' + dag_prefix_name + '.json') print("END: delete_deployment_file") with airflow.DAG(dag_display_name, default_args=default_args, start_date=datetime(2021, 1, 1), catchup=False, # Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Either run manually or every 15 minutes (for auto delete) schedule_interval=schedule_interval) as dag: # NOTE: The Composer Service Account will Impersonate the Terraform service account # This will deploy the Terraform code if this DAG ends with "-deploy" execute_terraform_deploy = bash_operator.BashOperator( task_id='execute_terraform_deploy', bash_command=terraform_bash_file, params=params_list, execution_timeout=timedelta(minutes=60), env={"ENV_RUN_BASH": env_run_bash_deploy}, append_env=True, dag=dag ) # This will write out the deployment time to a file if this DAG ends with "-deploy" write_deployment_file = PythonOperator( task_id='write_deployment_file', python_callable= write_deployment_file, op_kwargs = { "deploy_or_destroy" : is_deploy_or_destroy }, execution_timeout=timedelta(minutes=1), dag=dag, ) # This determine if it is time to delete the deployment if this DAG ends with "-destroy" delete_environment = PythonOperator( task_id='delete_environment', python_callable= delete_environment, op_kwargs = { "deploy_or_destroy" : is_deploy_or_destroy }, execution_timeout=timedelta(minutes=1), dag=dag, ) # This will delete the deployment if this DAG ends with "-destroy" and delete_environment = True (time to delete has passed) execute_terraform_destroy = bash_operator.BashOperator( task_id='execute_terraform_destroy', bash_command=terraform_bash_file, params=params_list, execution_timeout=timedelta(minutes=60), env={"ENV_RUN_BASH": "{{ task_instance.xcom_pull(task_ids='delete_environment') }}"}, append_env=True, dag=dag ) # This will delete the deployment "file" if this DAG ends with "-destroy" and delete_environment = True (time to delete has passed) delete_deployment_file = PythonOperator( task_id='delete_deployment_file', python_callable= delete_deployment_file, op_kwargs = { "delete_environment" : "{{ task_instance.xcom_pull(task_ids='delete_environment') }}" }, execution_timeout=timedelta(minutes=1), dag=dag, ) # DAG Graph execute_terraform_deploy >> write_deployment_file >> delete_environment >> execute_terraform_destroy >> delete_deployment_file # [END dag]