def last_error_node()

in o2a/o2a_libs/src/o2a_lib/el_wf_functions.py [0:0]


def last_error_node(context=None, session=None) -> str:
    """
    It returns the name of the last workflow action node that exit with an ERROR
    exit state, or an empty string if no action has exited with ERROR state in the
    current workflow job.
    """
    drun: Optional[DagRun] = context.get("dag_run", None)
    if drun is None:
        raise AirflowException("No dag_run reference in context.")

    dag_id = drun.dag_id
    ti = TaskInstance  # pylint:disable=invalid-name
    last_failed_task = (
        session.query(TaskInstance)
        .filter(ti.dag_id == dag_id)
        .filter(ti.task_id.endswith("_error"))
        .order_by(ti.execution_date.asc())
        .first()
    )

    if not last_failed_task:
        return ""

    task_name: str = last_failed_task.task_id

    task_map: Optional[dict] = context.get("task_map", None)
    if task_map is None:
        raise AirflowException("No task map!")

    reversed_map: dict = _reverse_task_map(task_map)
    return reversed_map.get(task_name, "")