deprecated-code/dags/step-04-create-biglake-connection.py (96 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Creates and runs a BigQuery data transfer which can be used to transfer data for Dataset copies or # Cloud Storage,Google Ad Manager,Google Ads,Google Merchant Center (beta),Google Play,Search Ads 360 (beta), YouTube Channel reports,, YouTube Content Owner reports,Amazon S3, Teradata,Amazon Redshift # NOTE: This creates the destination dataset and the transfer, but you only need to # perform thess steps once. You "should" test for their existance before creating. # If you want to run this twice, then you should delete the "{your dataset}__public_copy" # (not your main one) and delete the Data Transfer Service. You can always run the data # transfer service via the cloud console. # NOTE: You will need to run this command after the first time this DAG is run and fails. # The service account is not created until DTS is run? # gcloud iam service-accounts add-iam-policy-binding \ # composer-service-account@{{ params.project_id }}.iam.gserviceaccount.com \ # --member='serviceAccount:service-{{ params.project_number }}@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com' \ # --role='roles/iam.serviceAccountTokenCreator' # [START dag] from google.cloud import storage from datetime import datetime, timedelta import requests import sys import os import logging import airflow from airflow.operators import bash_operator from airflow.utils import trigger_rule from airflow.operators.python_operator import PythonOperator import google.auth import google.auth.transport.requests import json from airflow.models import TaskInstance from airflow.operators.python import get_current_context default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] bigquery_region = os.environ['ENV_BIGQUERY_REGION'] params_list = { "project_id" : project_id, "bigquery_region" : bigquery_region, } bq_create_connection= \ "bq mk --connection " + \ "--location=\"" + bigquery_region + "\" " + \ "--project_id=\"" + project_id + "\" " + \ "--connection_type=CLOUD_RESOURCE " + \ "biglake-connection" + \ "|| true" bq_show_connections= \ "bq show --connection " + \ "--location=\"" + bigquery_region + "\" " + \ "--project_id=\"" + project_id + "\" " + \ "--format=json " + \ "biglake-connection > /home/airflow/gcs/data/bq-connection.json" # Allow accounts from other domains change_org_policy= \ "echo \"name: projects/" + project_id + "/policies/iam.allowedPolicyMemberDomains\" > iam_allowedPolicyMemberDomains.yaml; " + \ "echo \"spec:\" >> iam_allowedPolicyMemberDomains.yaml; " + \ "echo \" rules:\" >> iam_allowedPolicyMemberDomains.yaml; " + \ "echo \" - allow_all: true\" >> iam_allowedPolicyMemberDomains.yaml; " + \ "cat iam_allowedPolicyMemberDomains.yaml;" + \ "gcloud org-policies --impersonate-service-account \"" + project_id + "@" + project_id + ".iam.gserviceaccount.com" + "\" set-policy iam_allowedPolicyMemberDomains.yaml; " grant_iam= \ "serviceAccount=$(cat /home/airflow/gcs/data/serviceAccountId.txt); " + \ "echo \"serviceAccount: ${serviceAccount}\" ; " + \ "gcloud projects add-iam-policy-binding \"" + project_id + "\" " + \ "--member=\"serviceAccount:${serviceAccount}\" " + \ "--role='roles/storage.objectViewer'" # Get the connection service principal def parse_json(): with open('/home/airflow/gcs/data/bq-connection.json') as f: data = json.load(f) print(data) service_account=data["cloudResource"]["serviceAccountId"] with open('/home/airflow/gcs/data/serviceAccountId.txt', 'w') as f: f.write(service_account) with airflow.DAG('step-04-create-biglake-connection', default_args=default_args, start_date=datetime(2021, 1, 1), # Add the Composer "Data" directory which will hold the SQL scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Not scheduled, trigger only schedule_interval=None) as dag: # bq_create_connection = bash_operator.BashOperator( # task_id='bq_create_connection', # bash_command='bash_create_biglake_connection.sh', # params=params_list, # dag=dag # ) bq_create_connection = bash_operator.BashOperator( task_id="bq_create_connection", bash_command=bq_create_connection, ) bq_show_connections = bash_operator.BashOperator( task_id="bq_show_connections", bash_command=bq_show_connections, ) parse_connections = PythonOperator( task_id='parse_connections', python_callable= parse_json, execution_timeout=timedelta(minutes=1), dag=dag, ) change_org_policy = bash_operator.BashOperator( task_id="change_org_policy", bash_command=change_org_policy ) # Wait for policies to take affect sleep_2_minutes = bash_operator.BashOperator( task_id="sleep_2_minutes", bash_command="sleep 120" ) grant_iam = bash_operator.BashOperator( task_id="grant_iam", bash_command=grant_iam ) # DAG Graph bq_create_connection >> bq_show_connections >> parse_connections >> change_org_policy >> sleep_2_minutes >> grant_iam # [END dag]