in o2a/o2a_libs/src/o2a_lib/el_wf_functions.py [0:0]
def last_error_node(context=None, session=None) -> str:
"""
It returns the name of the last workflow action node that exit with an ERROR
exit state, or an empty string if no action has exited with ERROR state in the
current workflow job.
"""
drun: Optional[DagRun] = context.get("dag_run", None)
if drun is None:
raise AirflowException("No dag_run reference in context.")
dag_id = drun.dag_id
ti = TaskInstance # pylint:disable=invalid-name
last_failed_task = (
session.query(TaskInstance)
.filter(ti.dag_id == dag_id)
.filter(ti.task_id.endswith("_error"))
.order_by(ti.execution_date.asc())
.first()
)
if not last_failed_task:
return ""
task_name: str = last_failed_task.task_id
task_map: Optional[dict] = context.get("task_map", None)
if task_map is None:
raise AirflowException("No task map!")
reversed_map: dict = _reverse_task_map(task_map)
return reversed_map.get(task_name, "")