def get_workflow_status()

in cicd-deployers/dataform_runner.py [0:0]


def get_workflow_status(workflow_invocation_name):
    """Monitors the status of a Dataform workflow invocation.

    Args:
        workflow_invocation_name (str): The ID of the workflow invocation.
        df_client: The Dataform client object.
    """
    while True:
        request = dataform_v1beta1.GetWorkflowInvocationRequest(
            name=workflow_invocation_name
        )
        response = df_client.get_workflow_invocation(request)
        state = response.state.name
        logging.info(f'workflow state: {state} for {workflow_invocation_name}')

        if state == 'RUNNING':
            time.sleep(4)
            continue
        if state in ('FAILED', 'CANCELING', 'CANCELLED'):
            raise Exception(f'Error while running workflow {workflow_invocation_name}')
        elif state == 'SUCCEEDED':
            return
        break
    return