cloud-composer/dags/sample-dataplex-with-hms-deploy.py [112:250]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    }


####################################################################################
# Common items
####################################################################################
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': None,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'dagrun_timeout' : timedelta(minutes=60),
    }


# Write out a file that will log when we deployed
# This file can then be used for auto-delete
def write_deployment_file(deploy_or_destroy):
    print("BEGIN: write_deployment_file")
    if deploy_or_destroy == "deploy":
        run_datetime = datetime.now()
        data = {
            "deployment_datetime" : run_datetime.strftime("%m/%d/%Y %H:%M:%S"),
        }
        with open('/home/airflow/gcs/data/' + dag_prefix_name + '.json', 'w') as f:
            json.dump(data, f)
        print("data: ", data)
    else:
        print("write_deployment_file is skipped since this DAG is not a deployment DAG.")
    print("END: write_deployment_file")


# Determine if it is tiem to delete the environment if necessary
def delete_environment(deploy_or_destroy):
    print("BEGIN: delete_environment")
    delete_environment = False
    if deploy_or_destroy == "destroy":
        filePath = '/home/airflow/gcs/data/' + dag_prefix_name + '.json'
        if os.path.exists(filePath):
            with open(filePath) as f:
                data = json.load(f)
            
            print("deployment_datetime: ", data['deployment_datetime'])
            deployment_datetime = datetime.strptime(data['deployment_datetime'], "%m/%d/%Y %H:%M:%S")
            difference = deployment_datetime - datetime.now()
            print("difference.total_seconds(): ", abs(difference.total_seconds()))
            # Test for auto_delete_hours hours
            if auto_delete_hours == 0:
                print("No auto delete set auto_delete_hours:",auto_delete_hours)
            else:
                if abs(difference.total_seconds()) > (auto_delete_hours * 60 * 60):
                    print("Deleting Environment >", auto_delete_hours, " hours")
                    delete_environment = True
        else:
            print("Json files does not exist (no environment deployed)")
    else:
        print("delete_environment is skipped since this DAG is not a destroy DAG.")
    
    if delete_environment:       
        return "true"
    else:
        return "false"


# Removes the deployment file so we do not keep re-deleting
def delete_deployment_file(delete_environment):
    print("BEGIN: delete_deployment_file")
    print("delete_environment:",delete_environment)
    if delete_environment == "true":
        print("Deleting file:", '/home/airflow/gcs/data/' + dag_prefix_name + '.json')
        os.remove('/home/airflow/gcs/data/' + dag_prefix_name + '.json')
    print("END: delete_deployment_file")


with airflow.DAG(dag_display_name,
                 default_args=default_args,
                 start_date=datetime(2021, 1, 1),
                 catchup=False,
                 # Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment
                 template_searchpath=['/home/airflow/gcs/data'],
                 # Either run manually or every 15 minutes (for auto delete)
                 schedule_interval=schedule_interval) as dag:

    # NOTE: The Composer Service Account will Impersonate the Terraform service account

    # This will deploy the Terraform code if this DAG ends with "-deploy"
    execute_terraform_deploy = bash_operator.BashOperator(
        task_id='execute_terraform_deploy',
        bash_command=terraform_bash_file,
        params=params_list,
        execution_timeout=timedelta(minutes=60),
        env={"ENV_RUN_BASH": env_run_bash_deploy},
        append_env=True,
        dag=dag
        )

    # This will write out the deployment time to a file if this DAG ends with "-deploy"
    write_deployment_file = PythonOperator(
        task_id='write_deployment_file',
        python_callable= write_deployment_file,
        op_kwargs = { "deploy_or_destroy" : is_deploy_or_destroy },
        execution_timeout=timedelta(minutes=1),
        dag=dag,
        ) 
    
    # This determine if it is time to delete the deployment if this DAG ends with "-destroy"
    delete_environment = PythonOperator(
        task_id='delete_environment',
        python_callable= delete_environment,
        op_kwargs = { "deploy_or_destroy" : is_deploy_or_destroy },
        execution_timeout=timedelta(minutes=1),
        dag=dag,
        ) 
    
    # This will delete the deployment if this DAG ends with "-destroy" and delete_environment = True (time to delete has passed)
    execute_terraform_destroy = bash_operator.BashOperator(
        task_id='execute_terraform_destroy',
        bash_command=terraform_bash_file,
        params=params_list,
        execution_timeout=timedelta(minutes=60),
        env={"ENV_RUN_BASH": "{{ task_instance.xcom_pull(task_ids='delete_environment') }}"},
        append_env=True,
        dag=dag
        )

    # This will delete the deployment "file" if this DAG ends with "-destroy" and delete_environment = True (time to delete has passed)
    delete_deployment_file = PythonOperator(
        task_id='delete_deployment_file',
        python_callable= delete_deployment_file,
        op_kwargs = { "delete_environment" : "{{ task_instance.xcom_pull(task_ids='delete_environment') }}" },
        execution_timeout=timedelta(minutes=1),
        dag=dag,
        )        
    
    # DAG Graph
    execute_terraform_deploy >> write_deployment_file >> delete_environment >> execute_terraform_destroy >> delete_deployment_file
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



cloud-composer/dags/sample-terraform-bucket-demo-deploy.py [87:225]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    }


####################################################################################
# Common items
####################################################################################
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': None,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'dagrun_timeout' : timedelta(minutes=60),
    }


# Write out a file that will log when we deployed
# This file can then be used for auto-delete
def write_deployment_file(deploy_or_destroy):
    print("BEGIN: write_deployment_file")
    if deploy_or_destroy == "deploy":
        run_datetime = datetime.now()
        data = {
            "deployment_datetime" : run_datetime.strftime("%m/%d/%Y %H:%M:%S"),
        }
        with open('/home/airflow/gcs/data/' + dag_prefix_name + '.json', 'w') as f:
            json.dump(data, f)
        print("data: ", data)
    else:
        print("write_deployment_file is skipped since this DAG is not a deployment DAG.")
    print("END: write_deployment_file")


# Determine if it is tiem to delete the environment if necessary
def delete_environment(deploy_or_destroy):
    print("BEGIN: delete_environment")
    delete_environment = False
    if deploy_or_destroy == "destroy":
        filePath = '/home/airflow/gcs/data/' + dag_prefix_name + '.json'
        if os.path.exists(filePath):
            with open(filePath) as f:
                data = json.load(f)
            
            print("deployment_datetime: ", data['deployment_datetime'])
            deployment_datetime = datetime.strptime(data['deployment_datetime'], "%m/%d/%Y %H:%M:%S")
            difference = deployment_datetime - datetime.now()
            print("difference.total_seconds(): ", abs(difference.total_seconds()))
            # Test for auto_delete_hours hours
            if auto_delete_hours == 0:
                print("No auto delete set auto_delete_hours:",auto_delete_hours)
            else:
                if abs(difference.total_seconds()) > (auto_delete_hours * 60 * 60):
                    print("Deleting Environment >", auto_delete_hours, " hours")
                    delete_environment = True
        else:
            print("Json files does not exist (no environment deployed)")
    else:
        print("delete_environment is skipped since this DAG is not a destroy DAG.")
    
    if delete_environment:       
        return "true"
    else:
        return "false"


# Removes the deployment file so we do not keep re-deleting
def delete_deployment_file(delete_environment):
    print("BEGIN: delete_deployment_file")
    print("delete_environment:",delete_environment)
    if delete_environment == "true":
        print("Deleting file:", '/home/airflow/gcs/data/' + dag_prefix_name + '.json')
        os.remove('/home/airflow/gcs/data/' + dag_prefix_name + '.json')
    print("END: delete_deployment_file")


with airflow.DAG(dag_display_name,
                 default_args=default_args,
                 start_date=datetime(2021, 1, 1),
                 catchup=False,
                 # Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment
                 template_searchpath=['/home/airflow/gcs/data'],
                 # Either run manually or every 15 minutes (for auto delete)
                 schedule_interval=schedule_interval) as dag:

    # NOTE: The Composer Service Account will Impersonate the Terraform service account

    # This will deploy the Terraform code if this DAG ends with "-deploy"
    execute_terraform_deploy = bash_operator.BashOperator(
        task_id='execute_terraform_deploy',
        bash_command=terraform_bash_file,
        params=params_list,
        execution_timeout=timedelta(minutes=60),
        env={"ENV_RUN_BASH": env_run_bash_deploy},
        append_env=True,
        dag=dag
        )

    # This will write out the deployment time to a file if this DAG ends with "-deploy"
    write_deployment_file = PythonOperator(
        task_id='write_deployment_file',
        python_callable= write_deployment_file,
        op_kwargs = { "deploy_or_destroy" : is_deploy_or_destroy },
        execution_timeout=timedelta(minutes=1),
        dag=dag,
        ) 
    
    # This determine if it is time to delete the deployment if this DAG ends with "-destroy"
    delete_environment = PythonOperator(
        task_id='delete_environment',
        python_callable= delete_environment,
        op_kwargs = { "deploy_or_destroy" : is_deploy_or_destroy },
        execution_timeout=timedelta(minutes=1),
        dag=dag,
        ) 
    
    # This will delete the deployment if this DAG ends with "-destroy" and delete_environment = True (time to delete has passed)
    execute_terraform_destroy = bash_operator.BashOperator(
        task_id='execute_terraform_destroy',
        bash_command=terraform_bash_file,
        params=params_list,
        execution_timeout=timedelta(minutes=60),
        env={"ENV_RUN_BASH": "{{ task_instance.xcom_pull(task_ids='delete_environment') }}"},
        append_env=True,
        dag=dag
        )

    # This will delete the deployment "file" if this DAG ends with "-destroy" and delete_environment = True (time to delete has passed)
    delete_deployment_file = PythonOperator(
        task_id='delete_deployment_file',
        python_callable= delete_deployment_file,
        op_kwargs = { "delete_environment" : "{{ task_instance.xcom_pull(task_ids='delete_environment') }}" },
        execution_timeout=timedelta(minutes=1),
        dag=dag,
        )        
    
    # DAG Graph
    execute_terraform_deploy >> write_deployment_file >> delete_environment >> execute_terraform_destroy >> delete_deployment_file
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



