def get_workflow_state()

in functions/data-processing-engines/dataform-tag-executor/main.py [0:0]


def get_workflow_state(job_id: str):
    """Monitors the status of a Dataform workflow invocation.

    Args:
        job_id (str): The ID of the workflow invocation.
    """
    workflow_invocation_id = job_id.split("aef-", 1)[1]
    request = dataform_v1beta1.GetWorkflowInvocationRequest(
        name=workflow_invocation_id
    )
    response = df_client.get_workflow_invocation(request)
    state = response.state.name
    logging.info(f'workflow state: {state}')
    return state