in src/common_utils/operators/reporting_operator.py [0:0]
def execute(self, context):
if self.config is None or self.config == dict():
logging.info("Configuration is not given in operator")
config = (
ast.literal_eval(context["dag_run"].conf["config"])
if (isinstance(context["dag_run"].conf["config"], str))
else context["dag_run"].conf["config"]
)
else:
config = (
ast.literal_eval(self.config)
if (isinstance(self.config, str))
else self.config
)
failedTaskArr = []
dynamicFailedTaskCnt = 0
upstreamFailedTaskArr = []
remote_base_log_folder = conf.get("logging", "remote_base_log_folder")
contextTaskId = context["task_instance"].task_id
logging.info(f"context task_id - {contextTaskId}")
for task_instance in context["dag_run"].get_task_instances():
logging.info(
f"ti task_id - {task_instance.task_id} and its status {task_instance.state}"
)
if (
task_instance.state != State.SUCCESS
and task_instance.task_id != context["task_instance"].task_id
):
if (
task_instance.state == State.FAILED
and task_instance.task_id != contextTaskId
):
# read failed task log
fth = GCSTaskHandler(
base_log_folder="log",
gcs_log_folder=f"{remote_base_log_folder}",
)
actual = fth._read(
ti=task_instance, try_number=task_instance.try_number - 1
)
# get error message from log
errorMessage = self.__getErrorMessage(actual)
logging.info(
f"Task {task_instance.task_id} is failed with error {errorMessage} and in {task_instance.try_number-1} total tries"
)
failedTaskArr.append(
{
"task_name": task_instance.task_id,
"error_message": errorMessage,
"log_link": task_instance.log_url,
}
)
# Task instance with map index not a -1 means task is Dynamically mapped task
if task_instance.map_index != -1:
logging.info(
f"Failed task {task_instance.task_id} is part of Dynamic Task Mapping"
)
dynamicFailedTaskCnt = dynamicFailedTaskCnt + 1
elif (
task_instance.state == State.UPSTREAM_FAILED
and task_instance.task_id != contextTaskId
):
upstreamFailedTaskArr.append(task_instance.task_id)
else:
logging.info(f"Task {task_instance.task_id} is successful")
dagStatus = FAILED_STATUS
if len(failedTaskArr) == 0 and len(upstreamFailedTaskArr) == 0:
dagStatus = SUCCESS_STATUS
elif (
len(failedTaskArr) > 0
and len(failedTaskArr) == dynamicFailedTaskCnt
and len(upstreamFailedTaskArr) == 0
):
dagStatus = PARTIAL_SUCCESS
reportRecord = [
{
"unique_id": config["unique_id"],
"dag_name": getattr(context["dag"], "dag_id"),
"execution_time": str(context["execution_date"]),
"dag_status": dagStatus,
"source_db": config["source"],
"Error": failedTaskArr,
}
]
self.__saveDAGReport(reportRecord)
if dagStatus is FAILED_STATUS:
logging.info(
"Task {} failed. Failing this DAG run".format(
[errorItm["task_name"] for errorItm in reportRecord[0]["Error"]]
)
)
raise AirflowFailException(
"Task {} failed. Failing this DAG run".format(
[errorItm["task_name"] for errorItm in reportRecord[0]["Error"]]
)
)