def _execute_task()

in airflow-core/src/airflow/models/taskinstance.py [0:0]


    def _execute_task(self, context: Context, task_orig: Operator):
        """
        Execute Task (optionally with a Timeout) and push Xcom results.

        :param context: Jinja2 context
        :param task_orig: origin task
        """
        from airflow.sdk.bases.operator import ExecutorSafeguard
        from airflow.sdk.definitions.mappedoperator import MappedOperator

        task_to_execute = self.task

        if TYPE_CHECKING:
            # TODO: TaskSDK this function will need 100% re-writing
            # This only works with a "rich" BaseOperator, not the SDK version
            assert isinstance(task_to_execute, BaseOperator)

        if isinstance(task_to_execute, MappedOperator):
            raise AirflowException("MappedOperator cannot be executed.")

        # If the task has been deferred and is being executed due to a trigger,
        # then we need to pick the right method to come back to, otherwise
        # we go for the default execute
        execute_callable_kwargs: dict[str, Any] = {}
        execute_callable: Callable
        if self.next_method:
            execute_callable = task_to_execute.resume_execution
            execute_callable_kwargs["next_method"] = self.next_method
            # We don't want modifictions we make here to be tracked by SQLA
            execute_callable_kwargs["next_kwargs"] = {**(self.next_kwargs or {})}
            if self.next_method == "execute":
                execute_callable_kwargs["next_kwargs"][f"{task_to_execute.__class__.__name__}__sentinel"] = (
                    ExecutorSafeguard.sentinel_value
                )
        else:
            execute_callable = task_to_execute.execute
            if execute_callable.__name__ == "execute":
                execute_callable_kwargs[f"{task_to_execute.__class__.__name__}__sentinel"] = (
                    ExecutorSafeguard.sentinel_value
                )

        def _execute_callable(context: Context, **execute_callable_kwargs):
            from airflow.sdk.execution_time.callback_runner import create_executable_runner
            from airflow.sdk.execution_time.context import context_get_outlet_events

            try:
                # Print a marker for log grouping of details before task execution
                log.info("::endgroup::")

                return create_executable_runner(
                    execute_callable,
                    context_get_outlet_events(context),
                    logger=log,
                ).run(context=context, **execute_callable_kwargs)
            except SystemExit as e:
                # Handle only successful cases here. Failure cases will be handled upper
                # in the exception chain.
                if e.code is not None and e.code != 0:
                    raise
                return None

        # If a timeout is specified for the task, make it fail
        # if it goes beyond
        if task_to_execute.execution_timeout:
            # If we are coming in with a next_method (i.e. from a deferral),
            # calculate the timeout from our start_date.
            if self.next_method and self.start_date:
                timeout_seconds = (
                    task_to_execute.execution_timeout - (timezone.utcnow() - self.start_date)
                ).total_seconds()
            else:
                timeout_seconds = task_to_execute.execution_timeout.total_seconds()
            try:
                # It's possible we're already timed out, so fast-fail if true
                if timeout_seconds <= 0:
                    raise AirflowTaskTimeout()
                # Run task in timeout wrapper
                with timeout(timeout_seconds):
                    result = _execute_callable(context=context, **execute_callable_kwargs)
            except AirflowTaskTimeout:
                task_to_execute.on_kill()
                raise
        else:
            result = _execute_callable(context=context, **execute_callable_kwargs)
        cm = create_session()
        with cm as session_or_null:
            if task_to_execute.do_xcom_push:
                xcom_value = result
            else:
                xcom_value = None
            if xcom_value is not None:  # If the task returns a result, push an XCom containing it.
                if task_to_execute.multiple_outputs:
                    if not isinstance(xcom_value, Mapping):
                        raise AirflowException(
                            f"Returned output was type {type(xcom_value)} "
                            "expected dictionary for multiple_outputs"
                        )
                    for key in xcom_value.keys():
                        if not isinstance(key, str):
                            raise AirflowException(
                                "Returned dictionary keys must be strings when using "
                                f"multiple_outputs, found {key} ({type(key)}) instead"
                            )
                    for key, value in xcom_value.items():
                        self.xcom_push(key=key, value=value, session=session_or_null)
                self.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session_or_null)
            if TYPE_CHECKING:
                assert task_orig.dag
            _record_task_map_for_downstreams(
                task_instance=self,
                task=task_orig,
                value=xcom_value,
                session=session_or_null,
            )
        return result