def _expand()

in task-sdk/src/airflow/sdk/bases/decorator.py [0:0]


    def _expand(self, expand_input: ExpandInput, *, strict: bool) -> XComArg:
        ensure_xcomarg_return_value(expand_input.value)

        task_kwargs = self.kwargs.copy()
        dag = task_kwargs.pop("dag", None) or DagContext.get_current()
        task_group = task_kwargs.pop("task_group", None) or TaskGroupContext.get_current(dag)

        default_args, partial_params = get_merged_defaults(
            dag=dag,
            task_group=task_group,
            task_params=task_kwargs.pop("params", None),
            task_default_args=task_kwargs.pop("default_args", None),
        )
        partial_kwargs: dict[str, Any] = {
            "is_setup": self.is_setup,
            "is_teardown": self.is_teardown,
            "on_failure_fail_dagrun": self.on_failure_fail_dagrun,
        }
        base_signature = inspect.signature(BaseOperator)
        ignore = {
            "default_args",  # This is target we are working on now.
            "kwargs",  # A common name for a keyword argument.
            "do_xcom_push",  # In the same boat as `multiple_outputs`
            "multiple_outputs",  # We will use `self.multiple_outputs` instead.
            "params",  # Already handled above `partial_params`.
            "task_concurrency",  # Deprecated(replaced by `max_active_tis_per_dag`).
        }
        partial_keys = set(base_signature.parameters) - ignore
        partial_kwargs.update({key: value for key, value in default_args.items() if key in partial_keys})
        partial_kwargs.update(task_kwargs)

        task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, task_group)
        if task_group:
            task_id = task_group.child_id(task_id)

        # Logic here should be kept in sync with BaseOperatorMeta.partial().
        if partial_kwargs.get("wait_for_downstream"):
            partial_kwargs["depends_on_past"] = True
        start_date = timezone.convert_to_utc(partial_kwargs.pop("start_date", None))
        end_date = timezone.convert_to_utc(partial_kwargs.pop("end_date", None))
        if "pool_slots" in partial_kwargs:
            if partial_kwargs["pool_slots"] < 1:
                dag_str = ""
                if dag:
                    dag_str = f" in dag {dag.dag_id}"
                raise ValueError(f"pool slots for {task_id}{dag_str} cannot be less than 1")

        for fld, convert in (
            ("retries", parse_retries),
            ("retry_delay", coerce_timedelta),
            ("max_retry_delay", coerce_timedelta),
            ("resources", coerce_resources),
        ):
            if (v := partial_kwargs.get(fld, NOTSET)) is not NOTSET:
                partial_kwargs[fld] = convert(v)  # type: ignore[operator]

        partial_kwargs.setdefault("executor_config", {})
        partial_kwargs.setdefault("op_args", [])
        partial_kwargs.setdefault("op_kwargs", {})

        # Mypy does not work well with a subclassed attrs class :(
        _MappedOperator = cast("Any", DecoratedMappedOperator)

        try:
            operator_name = self.operator_class.custom_operator_name  # type: ignore
        except AttributeError:
            operator_name = self.operator_class.__name__

        operator = _MappedOperator(
            operator_class=self.operator_class,
            expand_input=EXPAND_INPUT_EMPTY,  # Don't use this; mapped values go to op_kwargs_expand_input.
            partial_kwargs=partial_kwargs,
            task_id=task_id,
            params=partial_params,
            operator_extra_links=self.operator_class.operator_extra_links,
            template_ext=self.operator_class.template_ext,
            template_fields=self.operator_class.template_fields,
            template_fields_renderers=self.operator_class.template_fields_renderers,
            ui_color=self.operator_class.ui_color,
            ui_fgcolor=self.operator_class.ui_fgcolor,
            is_empty=False,
            is_sensor=self.operator_class._is_sensor,
            can_skip_downstream=self.operator_class._can_skip_downstream,
            task_module=self.operator_class.__module__,
            task_type=self.operator_class.__name__,
            operator_name=operator_name,
            dag=dag,
            task_group=task_group,
            start_date=start_date,
            end_date=end_date,
            multiple_outputs=self.multiple_outputs,
            python_callable=self.function,
            op_kwargs_expand_input=expand_input,
            disallow_kwargs_override=strict,
            # Different from classic operators, kwargs passed to a taskflow
            # task's expand() contribute to the op_kwargs operator argument, not
            # the operator arguments themselves, and should expand against it.
            expand_input_attr="op_kwargs_expand_input",
            start_trigger_args=self.operator_class.start_trigger_args,
            start_from_trigger=self.operator_class.start_from_trigger,
        )
        return XComArg(operator=operator)