def populate_operator()

in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]


    def populate_operator(cls, op: Operator, encoded_op: dict[str, Any]) -> None:
        """
        Populate operator attributes with serialized values.

        This covers simple attributes that don't reference other things in the
        DAG. Setting references (such as ``op.dag`` and task dependencies) is
        done in ``set_task_dag_references`` instead, which is called after the
        DAG is hydrated.
        """
        # Extra Operator Links defined in Plugins
        op_extra_links_from_plugin = {}

        # We don't want to load Extra Operator links in Scheduler
        if cls._load_operator_extra_links:
            from airflow import plugins_manager

            plugins_manager.initialize_extra_operators_links_plugins()

            if plugins_manager.operator_extra_links is None:
                raise AirflowException("Can not load plugins")

            for ope in plugins_manager.operator_extra_links:
                for operator in ope.operators:
                    if (
                        operator.__name__ == encoded_op["task_type"]
                        and operator.__module__ == encoded_op["_task_module"]
                    ):
                        op_extra_links_from_plugin.update({ope.name: ope})

            # If OperatorLinks are defined in Plugins but not in the Operator that is being Serialized
            # set the Operator links attribute
            # The case for "If OperatorLinks are defined in the operator that is being Serialized"
            # is handled in the deserialization loop where it matches k == "_operator_extra_links"
            if op_extra_links_from_plugin and "_operator_extra_links" not in encoded_op:
                setattr(
                    op,
                    "operator_extra_links",
                    list(op_extra_links_from_plugin.values()),
                )

        for k, v in encoded_op.items():
            # python_callable_name only serves to detect function name changes
            if k == "python_callable_name":
                continue
            if k in ("_outlets", "_inlets"):
                # `_outlets` -> `outlets`
                k = k[1:]
            elif k == "task_type":
                k = "_task_type"
            if k == "_downstream_task_ids":
                # Upgrade from old format/name
                k = "downstream_task_ids"

            if k == "label":
                # Label shouldn't be set anymore --  it's computed from task_id now
                continue
            if k == "downstream_task_ids":
                v = set(v)
            elif k in {"retry_delay", "execution_timeout", "max_retry_delay"}:
                # If operator's execution_timeout is None and core.default_task_execution_timeout is not None,
                # v will be None so do not deserialize into timedelta
                if v is not None:
                    v = cls._deserialize_timedelta(v)
            elif k in encoded_op["template_fields"]:
                pass
            elif k == "resources":
                v = Resources.from_dict(v)
            elif k.endswith("_date"):
                v = cls._deserialize_datetime(v)
            elif k == "_operator_extra_links":
                if cls._load_operator_extra_links:
                    op_predefined_extra_links = cls._deserialize_operator_extra_links(v)

                    # If OperatorLinks with the same name exists, Links via Plugin have higher precedence
                    op_predefined_extra_links.update(op_extra_links_from_plugin)
                else:
                    op_predefined_extra_links = {}

                v = list(op_predefined_extra_links.values())
                k = "operator_extra_links"

            elif k == "params":
                v = cls._deserialize_params_dict(v)
                if op.params:  # Merge existing params if needed.
                    v, new = op.params, v
                    v.update(new)
            elif k == "partial_kwargs":
                v = {arg: cls.deserialize(value) for arg, value in v.items()}
            elif k in {"expand_input", "op_kwargs_expand_input"}:
                v = _ExpandInputRef(v["type"], cls.deserialize(v["value"]))
            elif k == "operator_class":
                v = {k_: cls.deserialize(v_) for k_, v_ in v.items()}
            elif k == "_is_sensor":
                from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep

                if v is False:
                    raise RuntimeError("_is_sensor=False should never have been serialized!")
                object.__setattr__(op, "deps", op.deps | {ReadyToRescheduleDep()})  # type: ignore[union-attr]
                continue
            elif (
                k in cls._decorated_fields
                or k not in op.get_serialized_fields()
                or k in ("outlets", "inlets")
            ):
                v = cls.deserialize(v)
            elif k == "on_failure_fail_dagrun":
                k = "_on_failure_fail_dagrun"
            elif k == "weight_rule":
                v = decode_priority_weight_strategy(v)

            # else use v as it is

            setattr(op, k, v)

        for k in op.get_serialized_fields() - encoded_op.keys() - cls._CONSTRUCTOR_PARAMS.keys():
            # TODO: refactor deserialization of BaseOperator and MappedOperator (split it out), then check
            # could go away.
            if not hasattr(op, k):
                setattr(op, k, None)

        # Set all the template_field to None that were not present in Serialized JSON
        for field in op.template_fields:
            if not hasattr(op, field):
                setattr(op, field, None)

        # Used to determine if an Operator is inherited from EmptyOperator
        setattr(op, "_is_empty", bool(encoded_op.get("_is_empty", False)))

        # Used to determine if an Operator is inherited from SkipMixin
        setattr(op, "_can_skip_downstream", bool(encoded_op.get("_can_skip_downstream", False)))

        start_trigger_args = None
        encoded_start_trigger_args = encoded_op.get("start_trigger_args", None)
        if encoded_start_trigger_args:
            encoded_start_trigger_args = cast("dict", encoded_start_trigger_args)
            start_trigger_args = decode_start_trigger_args(encoded_start_trigger_args)
        setattr(op, "start_trigger_args", start_trigger_args)
        setattr(op, "start_from_trigger", bool(encoded_op.get("start_from_trigger", False)))