in cloud-composer/dags/sample-create-data-fusion.py [0:0]
def wait_for_data_fusion_provisioning(project_id, region, datafusion_name):
# Get auth (default service account running composer worker node)
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request() # required to acess access token
creds.refresh(auth_req)
access_token=creds.token
auth_header = {'Authorization' : "Bearer " + access_token }
instanceState = "CREATING"
i = 1
while instanceState == "CREATING":
time.sleep(30)
# Check the state
stateDataFusion="https://datafusion.googleapis.com/v1beta1/projects/{}/locations/{}/instances/{}".format(project_id,region,datafusion_name)
try:
response = requests.get(stateDataFusion, headers=auth_header)
response.raise_for_status()
print("Checking State of Data Fusion Deployment")
except requests.exceptions.RequestException as err:
print(err)
raise err
instanceState = response.json()['state']
print("Checking Data Fusion State ({}) value: {}".format(i, instanceState))
print("JSON: ", response.json())
i = i + 1