def create_data_fusion()

in cloud-composer/dags/sample-create-data-fusion.py [0:0]


def create_data_fusion(project_id, region, datafusion_name):

    # Get auth (default service account running composer worker node)
    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request() # required to acess access token
    creds.refresh(auth_req)
    access_token=creds.token
    auth_header = { 'Authorization'   : "Bearer " + access_token ,
                    'Accept'          : 'application/json',
                    'Content-Type'    : 'application/json'
    }

    # call rest api with bearer token
    createUri="https://datafusion.googleapis.com/v1beta1/projects/{}/locations/{}/instances?instanceId={}".format(project_id,region,datafusion_name)
    # https://cloud.google.com/data-fusion/docs/reference/rest/v1beta1/projects.locations.instances#Instance
    request_body= '{ "labels" : { "env" : "prod" } }'


    try:
        response = requests.post(createUri, headers=auth_header, data=request_body)
        response.raise_for_status()
        print("Create Data Fusion")
    except requests.exceptions.RequestException as err:
        print(err)
        raise err