deprecated-code/dags/sample-bigquery-external-cloud-function.py (141 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Creates a BigQuery connection to call an external Cloud Function # Deploys the Cloud Function code (in data directory of Composer) # Sets some org policies for deployment # Grants the BigQuery connection (service principal) invoker access to the Cloud Function # # Run the stored procedure: sp_demo_external_function to do the demo # [START dag] from google.cloud import storage from datetime import datetime, timedelta import requests import sys import os import logging import airflow from airflow.operators import bash_operator from airflow.utils import trigger_rule from airflow.operators.python_operator import PythonOperator import google.auth import google.auth.transport.requests import json from airflow.models import TaskInstance from airflow.operators.python import get_current_context default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] bigquery_region = os.environ['ENV_BIGQUERY_REGION'] processed_bucket_name = "gs://" + os.environ['ENV_PROCESSED_BUCKET'] params_list = { "project_id" : project_id, "bigquery_region" : bigquery_region, } bq_create_connection= \ "bq mk --connection " + \ "--location=\"" + bigquery_region + "\" " + \ "--project_id=\"" + project_id + "\" " + \ "--connection_type=CLOUD_RESOURCE " + \ "cloud-function" + \ "|| true" bq_show_connections= \ "bq show --connection " + \ "--location=\"" + bigquery_region + "\" " + \ "--project_id=\"" + project_id + "\" " + \ "--format=json " + \ "cloud-function > /home/airflow/gcs/data/bq-connection-cf.json" # Allow accounts from other domains change_org_policy_allowedPolicyMemberDomains= \ "echo \"name: projects/" + project_id + "/policies/iam.allowedPolicyMemberDomains\" > iam_allowedPolicyMemberDomains.yaml; " + \ "echo \"spec:\" >> iam_allowedPolicyMemberDomains.yaml; " + \ "echo \" rules:\" >> iam_allowedPolicyMemberDomains.yaml; " + \ "echo \" - allow_all: true\" >> iam_allowedPolicyMemberDomains.yaml; " + \ "cat iam_allowedPolicyMemberDomains.yaml;" + \ "gcloud org-policies --impersonate-service-account \"" + project_id + "@" + project_id + ".iam.gserviceaccount.com" + "\" set-policy iam_allowedPolicyMemberDomains.yaml; " # BigQuery connection service principal access to call function grant_iam_function_invoker= \ "serviceAccount=$(cat /home/airflow/gcs/data/serviceAccountId-cf.txt); " + \ "echo \"serviceAccount: ${serviceAccount}\" ; " + \ "gcloud functions add-iam-policy-binding bigquery_external_function " + \ "--project=\"" + project_id + "\" " + \ "--region=\"REPLACE-REGION\" " + \ "--member=\"serviceAccount:${serviceAccount}\" " + \ "--role='roles/cloudfunctions.invoker'" #SET: constraints/cloudfunctions.allowedIngressSettings #TO: ALLOW_ALL change_org_policy_allowedIngressSettings = \ "echo \"name: projects/" + project_id + "/policies/cloudfunctions.allowedIngressSettings\" > iam_allowedIngressSettings.yaml; " + \ "echo \"spec:\" >> iam_allowedIngressSettings.yaml; " + \ "echo \" rules:\" >> iam_allowedIngressSettings.yaml; " + \ "echo \" - allow_all: true\" >> iam_allowedIngressSettings.yaml; " + \ "cat iam_allowedIngressSettings.yaml;" + \ "gcloud org-policies --impersonate-service-account \"" + project_id + "@" + project_id + ".iam.gserviceaccount.com" + "\" set-policy iam_allowedIngressSettings.yaml; " # Public calls (No anonymous calls, we will add the service principal) deploy_cloud_function= \ "cd /home/airflow/gcs/data/bigquery-external-function; " + \ "gcloud functions deploy bigquery_external_function " + \ "--project=\"" + project_id + "\" " + \ "--region=\"REPLACE-REGION\" " + \ "--runtime=\"python310\" " + \ "--ingress-settings=\"all\" " + \ "--no-allow-unauthenticated " + \ "--trigger-http" # Cloud function access to read bucket grant_bucket_iam = "gsutil iam ch \"serviceAccount:" + project_id + "@appspot.gserviceaccount.com:objectViewer\" " + processed_bucket_name # Sample call from project # NOTE: You must be in the project to call (run from Cloud Shell) # The function has an external IP address, but require authenication curl_command = \ "curl -m 70 -X POST https://REPLACE-REGION-" + project_id + ".cloudfunctions.net/bigquery_external_function " + \ "-H \"Authorization: bearer $(gcloud auth print-identity-token)\" " + \ "-H \"Content-Type: application/json\" " + \ "-d '{ " + \ " \"userDefinedContext\": {\"mode\":\"detect_labels_uri\" }, " + \ " \"calls\":[ [\"gs://cloud-samples-data/vision/label/setagaya.jpeg\"]] " + \ "}'" """ curl -m 70 -X POST https://REPLACE-REGION-${project}.cloudfunctions.net/bigquery_external_function \ -H "Authorization: bearer $(gcloud auth print-identity-token)" \ -H "Content-Type: application/json" \ -d '{ "userDefinedContext": {"mode":"detect_labels_uri" }, "calls":[ ["gs://cloud-samples-data/vision/label/setagaya.jpeg"]] }' """ # Get the connection service principal def parse_json(): with open('/home/airflow/gcs/data/bq-connection-cf.json') as f: data = json.load(f) print(data) service_account=data["cloudResource"]["serviceAccountId"] with open('/home/airflow/gcs/data/serviceAccountId-cf.txt', 'w') as f: f.write(service_account) with airflow.DAG('sample-bigquery-external-cloud-function', default_args=default_args, start_date=datetime(2021, 1, 1), # Add the Composer "Data" directory which will hold the SQL scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Not scheduled, trigger only schedule_interval=None) as dag: # For function to deploy with "All" parameter change_org_policy_allowedIngressSettings = bash_operator.BashOperator( task_id="change_org_policy_allowedIngressSettings", bash_command=change_org_policy_allowedIngressSettings, ) # To allow the BQ service connection to be added (sometimes service connections are considered external) change_org_policy_allowedPolicyMemberDomains = bash_operator.BashOperator( task_id="change_org_policy_allowedPolicyMemberDomains", bash_command=change_org_policy_allowedPolicyMemberDomains ) # Deploy the function deploy_cloud_function = bash_operator.BashOperator( task_id="deploy_cloud_function", bash_command=deploy_cloud_function ) # Wait for policies to take affect sleep_2_minutes = bash_operator.BashOperator( task_id="sleep_2_minutes", bash_command="sleep 120" ) # Create the connection (to the function) bq_create_connection = bash_operator.BashOperator( task_id="bq_create_connection", bash_command=bq_create_connection, ) # Extract the service principal name bq_show_connections = bash_operator.BashOperator( task_id="bq_show_connections", bash_command=bq_show_connections, ) # Parse the JSON after the extraction parse_connections = PythonOperator( task_id='parse_connections', python_callable= parse_json, execution_timeout=timedelta(minutes=1), dag=dag, ) # So the BQ connection can invoke the Cloud Function grant_iam_function_invoker = bash_operator.BashOperator( task_id="grant_iam_function_invoker", bash_command=grant_iam_function_invoker ) # So the cloud function can read from a bucket grant_bucket_iam = bash_operator.BashOperator( task_id="grant_bucket_iam", bash_command=grant_bucket_iam ) # DAG Graph change_org_policy_allowedIngressSettings >> change_org_policy_allowedPolicyMemberDomains >> \ sleep_2_minutes >> \ deploy_cloud_function >> \ bq_create_connection >> bq_show_connections >> parse_connections >> \ grant_iam_function_invoker >> \ grant_bucket_iam # [END dag]