in cloud-composer/dags/sample-create-data-fusion.py [0:0]
def create_data_fusion(project_id, region, datafusion_name):
# Get auth (default service account running composer worker node)
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request() # required to acess access token
creds.refresh(auth_req)
access_token=creds.token
auth_header = { 'Authorization' : "Bearer " + access_token ,
'Accept' : 'application/json',
'Content-Type' : 'application/json'
}
# call rest api with bearer token
createUri="https://datafusion.googleapis.com/v1beta1/projects/{}/locations/{}/instances?instanceId={}".format(project_id,region,datafusion_name)
# https://cloud.google.com/data-fusion/docs/reference/rest/v1beta1/projects.locations.instances#Instance
request_body= '{ "labels" : { "env" : "prod" } }'
try:
response = requests.post(createUri, headers=auth_header, data=request_body)
response.raise_for_status()
print("Create Data Fusion")
except requests.exceptions.RequestException as err:
print(err)
raise err