in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]
def populate_operator(cls, op: Operator, encoded_op: dict[str, Any]) -> None:
"""
Populate operator attributes with serialized values.
This covers simple attributes that don't reference other things in the
DAG. Setting references (such as ``op.dag`` and task dependencies) is
done in ``set_task_dag_references`` instead, which is called after the
DAG is hydrated.
"""
# Extra Operator Links defined in Plugins
op_extra_links_from_plugin = {}
# We don't want to load Extra Operator links in Scheduler
if cls._load_operator_extra_links:
from airflow import plugins_manager
plugins_manager.initialize_extra_operators_links_plugins()
if plugins_manager.operator_extra_links is None:
raise AirflowException("Can not load plugins")
for ope in plugins_manager.operator_extra_links:
for operator in ope.operators:
if (
operator.__name__ == encoded_op["task_type"]
and operator.__module__ == encoded_op["_task_module"]
):
op_extra_links_from_plugin.update({ope.name: ope})
# If OperatorLinks are defined in Plugins but not in the Operator that is being Serialized
# set the Operator links attribute
# The case for "If OperatorLinks are defined in the operator that is being Serialized"
# is handled in the deserialization loop where it matches k == "_operator_extra_links"
if op_extra_links_from_plugin and "_operator_extra_links" not in encoded_op:
setattr(
op,
"operator_extra_links",
list(op_extra_links_from_plugin.values()),
)
for k, v in encoded_op.items():
# python_callable_name only serves to detect function name changes
if k == "python_callable_name":
continue
if k in ("_outlets", "_inlets"):
# `_outlets` -> `outlets`
k = k[1:]
elif k == "task_type":
k = "_task_type"
if k == "_downstream_task_ids":
# Upgrade from old format/name
k = "downstream_task_ids"
if k == "label":
# Label shouldn't be set anymore -- it's computed from task_id now
continue
if k == "downstream_task_ids":
v = set(v)
elif k in {"retry_delay", "execution_timeout", "max_retry_delay"}:
# If operator's execution_timeout is None and core.default_task_execution_timeout is not None,
# v will be None so do not deserialize into timedelta
if v is not None:
v = cls._deserialize_timedelta(v)
elif k in encoded_op["template_fields"]:
pass
elif k == "resources":
v = Resources.from_dict(v)
elif k.endswith("_date"):
v = cls._deserialize_datetime(v)
elif k == "_operator_extra_links":
if cls._load_operator_extra_links:
op_predefined_extra_links = cls._deserialize_operator_extra_links(v)
# If OperatorLinks with the same name exists, Links via Plugin have higher precedence
op_predefined_extra_links.update(op_extra_links_from_plugin)
else:
op_predefined_extra_links = {}
v = list(op_predefined_extra_links.values())
k = "operator_extra_links"
elif k == "params":
v = cls._deserialize_params_dict(v)
if op.params: # Merge existing params if needed.
v, new = op.params, v
v.update(new)
elif k == "partial_kwargs":
v = {arg: cls.deserialize(value) for arg, value in v.items()}
elif k in {"expand_input", "op_kwargs_expand_input"}:
v = _ExpandInputRef(v["type"], cls.deserialize(v["value"]))
elif k == "operator_class":
v = {k_: cls.deserialize(v_) for k_, v_ in v.items()}
elif k == "_is_sensor":
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
if v is False:
raise RuntimeError("_is_sensor=False should never have been serialized!")
object.__setattr__(op, "deps", op.deps | {ReadyToRescheduleDep()}) # type: ignore[union-attr]
continue
elif (
k in cls._decorated_fields
or k not in op.get_serialized_fields()
or k in ("outlets", "inlets")
):
v = cls.deserialize(v)
elif k == "on_failure_fail_dagrun":
k = "_on_failure_fail_dagrun"
elif k == "weight_rule":
v = decode_priority_weight_strategy(v)
# else use v as it is
setattr(op, k, v)
for k in op.get_serialized_fields() - encoded_op.keys() - cls._CONSTRUCTOR_PARAMS.keys():
# TODO: refactor deserialization of BaseOperator and MappedOperator (split it out), then check
# could go away.
if not hasattr(op, k):
setattr(op, k, None)
# Set all the template_field to None that were not present in Serialized JSON
for field in op.template_fields:
if not hasattr(op, field):
setattr(op, field, None)
# Used to determine if an Operator is inherited from EmptyOperator
setattr(op, "_is_empty", bool(encoded_op.get("_is_empty", False)))
# Used to determine if an Operator is inherited from SkipMixin
setattr(op, "_can_skip_downstream", bool(encoded_op.get("_can_skip_downstream", False)))
start_trigger_args = None
encoded_start_trigger_args = encoded_op.get("start_trigger_args", None)
if encoded_start_trigger_args:
encoded_start_trigger_args = cast("dict", encoded_start_trigger_args)
start_trigger_args = decode_start_trigger_args(encoded_start_trigger_args)
setattr(op, "start_trigger_args", start_trigger_args)
setattr(op, "start_from_trigger", bool(encoded_op.get("start_from_trigger", False)))