in cicd-deployers/dataform_runner.py [0:0]
def get_workflow_status(workflow_invocation_name):
"""Monitors the status of a Dataform workflow invocation.
Args:
workflow_invocation_name (str): The ID of the workflow invocation.
df_client: The Dataform client object.
"""
while True:
request = dataform_v1beta1.GetWorkflowInvocationRequest(
name=workflow_invocation_name
)
response = df_client.get_workflow_invocation(request)
state = response.state.name
logging.info(f'workflow state: {state} for {workflow_invocation_name}')
if state == 'RUNNING':
time.sleep(4)
continue
if state in ('FAILED', 'CANCELING', 'CANCELLED'):
raise Exception(f'Error while running workflow {workflow_invocation_name}')
elif state == 'SUCCEEDED':
return
break
return