in airflow-core/src/airflow/models/taskinstance.py [0:0]
def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session):
"""Prepare Task for Execution."""
from airflow.sdk.execution_time.callback_runner import create_executable_runner
from airflow.sdk.execution_time.context import context_get_outlet_events
if TYPE_CHECKING:
assert self.task
parent_pid = os.getpid()
def signal_handler(signum, frame):
pid = os.getpid()
# If a task forks during execution (from DAG code) for whatever
# reason, we want to make sure that we react to the signal only in
# the process that we've spawned ourselves (referred to here as the
# parent process).
if pid != parent_pid:
os._exit(1)
return
self.log.error("Received SIGTERM. Terminating subprocesses.")
self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack()))
self.task.on_kill()
raise AirflowTaskTerminated(
f"Task received SIGTERM signal {self.task_id=} {self.dag_id=} {self.run_id=} {self.map_index=}"
)
signal.signal(signal.SIGTERM, signal_handler)
# Don't clear Xcom until the task is certain to execute, and check if we are resuming from deferral.
if not self.next_method:
self.clear_xcom_data()
with (
Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration"),
Stats.timer("task.duration", tags=self.stats_tags),
):
# Set the validated/merged params on the task object.
self.task.params = context["params"]
with set_current_context(context):
dag = self.task.get_dag()
if dag is not None:
jinja_env = dag.get_template_env()
else:
jinja_env = None
task_orig = self.render_templates(context=context, jinja_env=jinja_env)
# The task is never MappedOperator at this point.
if TYPE_CHECKING:
assert isinstance(self.task, BaseOperator)
if not test_mode:
rendered_fields = get_serialized_template_fields(task=self.task)
self.update_rtif(rendered_fields=rendered_fields)
# Export context to make it available for operators to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
os.environ.update(airflow_context_vars)
# Log context only for the default execution method, the assumption
# being that otherwise we're resuming a deferred task (in which
# case there's no need to log these again).
if not self.next_method:
self.log.info(
"Exporting env vars: %s",
" ".join(f"{k}={v!r}" for k, v in airflow_context_vars.items()),
)
# Run pre_execute callback
if self.task._pre_execute_hook:
create_executable_runner(
self.task._pre_execute_hook,
context_get_outlet_events(context),
logger=self.log,
).run(context)
create_executable_runner(
self.task.pre_execute,
context_get_outlet_events(context),
logger=self.log,
).run(context)
# Run on_execute callback
self._run_execute_callback(context, self.task)
# Run on_task_instance_running event
try:
get_listener_manager().hook.on_task_instance_running(
previous_state=TaskInstanceState.QUEUED, task_instance=self
)
except Exception:
log.exception("error calling listener")
def _render_map_index(context: Context, *, jinja_env: jinja2.Environment | None) -> str | None:
"""Render named map index if the DAG author defined map_index_template at the task level."""
if jinja_env is None or (template := context.get("map_index_template")) is None:
return None
rendered_map_index = jinja_env.from_string(template).render(context)
log.debug("Map index rendered as %s", rendered_map_index)
return rendered_map_index
# Execute the task.
with set_current_context(context):
try:
result = self._execute_task(context, task_orig)
except Exception:
# If the task failed, swallow rendering error so it doesn't mask the main error.
with contextlib.suppress(jinja2.TemplateSyntaxError, jinja2.UndefinedError):
self._rendered_map_index = _render_map_index(context, jinja_env=jinja_env)
raise
else: # If the task succeeded, render normally to let rendering error bubble up.
self._rendered_map_index = _render_map_index(context, jinja_env=jinja_env)
# Run post_execute callback
if self.task._post_execute_hook:
create_executable_runner(
self.task._post_execute_hook,
context_get_outlet_events(context),
logger=self.log,
).run(context, result)
create_executable_runner(
self.task.post_execute,
context_get_outlet_events(context),
logger=self.log,
).run(context, result)
Stats.incr(f"operator_successes_{self.task.task_type}", tags=self.stats_tags)
# Same metric with tagging
Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": self.task.task_type})
Stats.incr("ti_successes", tags=self.stats_tags)