in airflow-core/src/airflow/models/taskinstance.py [0:0]
def _execute_task(self, context: Context, task_orig: Operator):
"""
Execute Task (optionally with a Timeout) and push Xcom results.
:param context: Jinja2 context
:param task_orig: origin task
"""
from airflow.sdk.bases.operator import ExecutorSafeguard
from airflow.sdk.definitions.mappedoperator import MappedOperator
task_to_execute = self.task
if TYPE_CHECKING:
# TODO: TaskSDK this function will need 100% re-writing
# This only works with a "rich" BaseOperator, not the SDK version
assert isinstance(task_to_execute, BaseOperator)
if isinstance(task_to_execute, MappedOperator):
raise AirflowException("MappedOperator cannot be executed.")
# If the task has been deferred and is being executed due to a trigger,
# then we need to pick the right method to come back to, otherwise
# we go for the default execute
execute_callable_kwargs: dict[str, Any] = {}
execute_callable: Callable
if self.next_method:
execute_callable = task_to_execute.resume_execution
execute_callable_kwargs["next_method"] = self.next_method
# We don't want modifictions we make here to be tracked by SQLA
execute_callable_kwargs["next_kwargs"] = {**(self.next_kwargs or {})}
if self.next_method == "execute":
execute_callable_kwargs["next_kwargs"][f"{task_to_execute.__class__.__name__}__sentinel"] = (
ExecutorSafeguard.sentinel_value
)
else:
execute_callable = task_to_execute.execute
if execute_callable.__name__ == "execute":
execute_callable_kwargs[f"{task_to_execute.__class__.__name__}__sentinel"] = (
ExecutorSafeguard.sentinel_value
)
def _execute_callable(context: Context, **execute_callable_kwargs):
from airflow.sdk.execution_time.callback_runner import create_executable_runner
from airflow.sdk.execution_time.context import context_get_outlet_events
try:
# Print a marker for log grouping of details before task execution
log.info("::endgroup::")
return create_executable_runner(
execute_callable,
context_get_outlet_events(context),
logger=log,
).run(context=context, **execute_callable_kwargs)
except SystemExit as e:
# Handle only successful cases here. Failure cases will be handled upper
# in the exception chain.
if e.code is not None and e.code != 0:
raise
return None
# If a timeout is specified for the task, make it fail
# if it goes beyond
if task_to_execute.execution_timeout:
# If we are coming in with a next_method (i.e. from a deferral),
# calculate the timeout from our start_date.
if self.next_method and self.start_date:
timeout_seconds = (
task_to_execute.execution_timeout - (timezone.utcnow() - self.start_date)
).total_seconds()
else:
timeout_seconds = task_to_execute.execution_timeout.total_seconds()
try:
# It's possible we're already timed out, so fast-fail if true
if timeout_seconds <= 0:
raise AirflowTaskTimeout()
# Run task in timeout wrapper
with timeout(timeout_seconds):
result = _execute_callable(context=context, **execute_callable_kwargs)
except AirflowTaskTimeout:
task_to_execute.on_kill()
raise
else:
result = _execute_callable(context=context, **execute_callable_kwargs)
cm = create_session()
with cm as session_or_null:
if task_to_execute.do_xcom_push:
xcom_value = result
else:
xcom_value = None
if xcom_value is not None: # If the task returns a result, push an XCom containing it.
if task_to_execute.multiple_outputs:
if not isinstance(xcom_value, Mapping):
raise AirflowException(
f"Returned output was type {type(xcom_value)} "
"expected dictionary for multiple_outputs"
)
for key in xcom_value.keys():
if not isinstance(key, str):
raise AirflowException(
"Returned dictionary keys must be strings when using "
f"multiple_outputs, found {key} ({type(key)}) instead"
)
for key, value in xcom_value.items():
self.xcom_push(key=key, value=value, session=session_or_null)
self.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session_or_null)
if TYPE_CHECKING:
assert task_orig.dag
_record_task_map_for_downstreams(
task_instance=self,
task=task_orig,
value=xcom_value,
session=session_or_null,
)
return result