cloud-composer/dags/sample-create-data-fusion.py (91 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Creates a data fusion instance and waits for the instance to come online () # [START dag] from datetime import datetime, timedelta from airflow.operators import bash_operator from airflow.utils import trigger_rule from airflow.operators.python_operator import PythonOperator import requests import sys import os import logging import airflow import google.auth import google.auth.transport.requests import time default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] region = os.environ['ENV_DATAFUSION_REGION'] datafusion_name = "data-fusion-prod-01" # Creates a data fusion instance (name hardcoded) # NOTE: There is a data fusion airflow operator, this is designed to show the REST API call for other tooling as well def create_data_fusion(project_id, region, datafusion_name): # Get auth (default service account running composer worker node) creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() # required to acess access token creds.refresh(auth_req) access_token=creds.token auth_header = { 'Authorization' : "Bearer " + access_token , 'Accept' : 'application/json', 'Content-Type' : 'application/json' } # call rest api with bearer token createUri="https://datafusion.googleapis.com/v1beta1/projects/{}/locations/{}/instances?instanceId={}".format(project_id,region,datafusion_name) # https://cloud.google.com/data-fusion/docs/reference/rest/v1beta1/projects.locations.instances#Instance request_body= '{ "labels" : { "env" : "prod" } }' try: response = requests.post(createUri, headers=auth_header, data=request_body) response.raise_for_status() print("Create Data Fusion") except requests.exceptions.RequestException as err: print(err) raise err # Waits for the instance to come online def wait_for_data_fusion_provisioning(project_id, region, datafusion_name): # Get auth (default service account running composer worker node) creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() # required to acess access token creds.refresh(auth_req) access_token=creds.token auth_header = {'Authorization' : "Bearer " + access_token } instanceState = "CREATING" i = 1 while instanceState == "CREATING": time.sleep(30) # Check the state stateDataFusion="https://datafusion.googleapis.com/v1beta1/projects/{}/locations/{}/instances/{}".format(project_id,region,datafusion_name) try: response = requests.get(stateDataFusion, headers=auth_header) response.raise_for_status() print("Checking State of Data Fusion Deployment") except requests.exceptions.RequestException as err: print(err) raise err instanceState = response.json()['state'] print("Checking Data Fusion State ({}) value: {}".format(i, instanceState)) print("JSON: ", response.json()) i = i + 1 """ Sample Response { "name": "projects/data-analytics-demo-z4x77nbcc3/locations/us-central1/instances/qadatafusion", "type": "BASIC", "networkConfig": {}, "createTime": "2022-06-29T13:21:24.874523833Z", "updateTime": "2022-06-29T13:32:28.762387202Z", "state": "RUNNING", "serviceEndpoint": "https://qadatafusion-data-analytics-demo-z4x77nbcc3-dot-usw1.datafusion.googleusercontent.com", "version": "6.6.0", "serviceAccount": "cloud-datafusion-management-sa@wcb6888d249fbda6c-tp.iam.gserviceaccount.com", "displayName": "QADataFusion", "availableVersion": [ { "versionNumber": "6.7.0" } ], "apiEndpoint": "https://qadatafusion-data-analytics-demo-z4x77nbcc3-dot-usw1.datafusion.googleusercontent.com/api", "gcsBucket": "gs://df-2432185209294608530-52ppjphxvyi6zaykaizbbqaaaa", "p4ServiceAccount": "service-1074332558384@gcp-sa-datafusion.iam.gserviceaccount.com", "tenantProjectId": "wcb6888d240fbda7c-tp", "dataprocServiceAccount": "1074332558384-compute@developer.gserviceaccount.com" } """ with airflow.DAG('sample-create-data-fusion', default_args=default_args, start_date=datetime(2021, 1, 1), # Add the Composer "Data" directory which will hold the SQL scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Not scheduled, trigger only schedule_interval=None) as dag: # NOTE: The service account of the Composer worker node must have access to run these commands # Show creating data fusion via the REST API create_data_fusion = PythonOperator( task_id='create_data_fusion', python_callable= create_data_fusion, op_kwargs = { "project_id" : project_id, "region" : region, "datafusion_name" : datafusion_name, }, execution_timeout=timedelta(minutes=10), dag=dag, ) # Wait for the instance to come online wait_for_data_fusion_provisioning = PythonOperator( task_id='wait_for_data_fusion_provisioning', python_callable= wait_for_data_fusion_provisioning, op_kwargs = { "project_id" : project_id, "region" : region, "datafusion_name" : datafusion_name, }, execution_timeout=timedelta(minutes=60), dag=dag, ) # DAG Graph create_data_fusion >> wait_for_data_fusion_provisioning # [END dag]