def _execute_task_with_callbacks()

in airflow-core/src/airflow/models/taskinstance.py [0:0]


    def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session):
        """Prepare Task for Execution."""
        from airflow.sdk.execution_time.callback_runner import create_executable_runner
        from airflow.sdk.execution_time.context import context_get_outlet_events

        if TYPE_CHECKING:
            assert self.task

        parent_pid = os.getpid()

        def signal_handler(signum, frame):
            pid = os.getpid()

            # If a task forks during execution (from DAG code) for whatever
            # reason, we want to make sure that we react to the signal only in
            # the process that we've spawned ourselves (referred to here as the
            # parent process).
            if pid != parent_pid:
                os._exit(1)
                return
            self.log.error("Received SIGTERM. Terminating subprocesses.")
            self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack()))
            self.task.on_kill()
            raise AirflowTaskTerminated(
                f"Task received SIGTERM signal {self.task_id=} {self.dag_id=} {self.run_id=} {self.map_index=}"
            )

        signal.signal(signal.SIGTERM, signal_handler)

        # Don't clear Xcom until the task is certain to execute, and check if we are resuming from deferral.
        if not self.next_method:
            self.clear_xcom_data()

        with (
            Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration"),
            Stats.timer("task.duration", tags=self.stats_tags),
        ):
            # Set the validated/merged params on the task object.
            self.task.params = context["params"]

            with set_current_context(context):
                dag = self.task.get_dag()
                if dag is not None:
                    jinja_env = dag.get_template_env()
                else:
                    jinja_env = None
                task_orig = self.render_templates(context=context, jinja_env=jinja_env)

            # The task is never MappedOperator at this point.
            if TYPE_CHECKING:
                assert isinstance(self.task, BaseOperator)

            if not test_mode:
                rendered_fields = get_serialized_template_fields(task=self.task)
                self.update_rtif(rendered_fields=rendered_fields)
            # Export context to make it available for operators to use.
            airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
            os.environ.update(airflow_context_vars)

            # Log context only for the default execution method, the assumption
            # being that otherwise we're resuming a deferred task (in which
            # case there's no need to log these again).
            if not self.next_method:
                self.log.info(
                    "Exporting env vars: %s",
                    " ".join(f"{k}={v!r}" for k, v in airflow_context_vars.items()),
                )

            # Run pre_execute callback
            if self.task._pre_execute_hook:
                create_executable_runner(
                    self.task._pre_execute_hook,
                    context_get_outlet_events(context),
                    logger=self.log,
                ).run(context)
            create_executable_runner(
                self.task.pre_execute,
                context_get_outlet_events(context),
                logger=self.log,
            ).run(context)

            # Run on_execute callback
            self._run_execute_callback(context, self.task)

            # Run on_task_instance_running event
            try:
                get_listener_manager().hook.on_task_instance_running(
                    previous_state=TaskInstanceState.QUEUED, task_instance=self
                )
            except Exception:
                log.exception("error calling listener")

            def _render_map_index(context: Context, *, jinja_env: jinja2.Environment | None) -> str | None:
                """Render named map index if the DAG author defined map_index_template at the task level."""
                if jinja_env is None or (template := context.get("map_index_template")) is None:
                    return None
                rendered_map_index = jinja_env.from_string(template).render(context)
                log.debug("Map index rendered as %s", rendered_map_index)
                return rendered_map_index

            # Execute the task.
            with set_current_context(context):
                try:
                    result = self._execute_task(context, task_orig)
                except Exception:
                    # If the task failed, swallow rendering error so it doesn't mask the main error.
                    with contextlib.suppress(jinja2.TemplateSyntaxError, jinja2.UndefinedError):
                        self._rendered_map_index = _render_map_index(context, jinja_env=jinja_env)
                    raise
                else:  # If the task succeeded, render normally to let rendering error bubble up.
                    self._rendered_map_index = _render_map_index(context, jinja_env=jinja_env)

            # Run post_execute callback
            if self.task._post_execute_hook:
                create_executable_runner(
                    self.task._post_execute_hook,
                    context_get_outlet_events(context),
                    logger=self.log,
                ).run(context, result)
            create_executable_runner(
                self.task.post_execute,
                context_get_outlet_events(context),
                logger=self.log,
            ).run(context, result)

        Stats.incr(f"operator_successes_{self.task.task_type}", tags=self.stats_tags)
        # Same metric with tagging
        Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": self.task.task_type})
        Stats.incr("ti_successes", tags=self.stats_tags)