in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]
def conversion_v1_to_v2(ser_obj: dict):
dag_dict = ser_obj["dag"]
dag_renames = [
("_dag_id", "dag_id"),
("_task_group", "task_group"),
("_access_control", "access_control"),
]
task_renames = [("_task_type", "task_type")]
#
tasks_remove = [
"_log_config_logger_name",
"deps",
"sla",
# Operator extra links from Airflow 2 won't work anymore, only new ones, so remove these
"_operator_extra_links",
]
ser_obj["__version"] = 2
def replace_dataset_in_str(s):
return s.replace("Dataset", "Asset").replace("dataset", "asset")
def _replace_dataset_with_asset_in_timetables(obj, parent_key=None):
if isinstance(obj, dict):
new_obj = {}
for k, v in obj.items():
new_key = replace_dataset_in_str(k) if isinstance(k, str) else k
# Don't replace uri values
if new_key == "uri":
new_obj[new_key] = v
else:
new_value = (
replace_dataset_in_str(v)
if isinstance(v, str)
else _replace_dataset_with_asset_in_timetables(v, parent_key=new_key)
)
new_obj[new_key] = new_value
# Insert "name" and "group" if this is inside the 'objects' list
if parent_key == "objects":
new_obj["name"] = None
new_obj["group"] = None
return new_obj
elif isinstance(obj, list):
return [_replace_dataset_with_asset_in_timetables(i, parent_key=parent_key) for i in obj]
return obj
def _create_compat_timetable(value):
from airflow import settings
from airflow.sdk.definitions.dag import _create_timetable
if tzs := dag_dict.get("timezone"):
timezone = decode_timezone(tzs)
else:
timezone = settings.TIMEZONE
timetable = _create_timetable(value, timezone)
return encode_timetable(timetable)
for old, new in dag_renames:
if old in dag_dict:
dag_dict[new] = dag_dict.pop(old)
if default_args := dag_dict.get("default_args"):
for k in tasks_remove:
default_args["__var"].pop(k, None)
if timetable := dag_dict.get("timetable"):
if timetable["__type"] in {
"airflow.timetables.simple.DatasetTriggeredTimetable",
"airflow.timetables.datasets.DatasetOrTimeSchedule",
}:
dag_dict["timetable"] = _replace_dataset_with_asset_in_timetables(dag_dict["timetable"])
elif (sched := dag_dict.pop("schedule_interval", None)) is None:
dag_dict["timetable"] = _create_compat_timetable(None)
elif isinstance(sched, str):
dag_dict["timetable"] = _create_compat_timetable(sched)
elif sched.get("__type") == "timedelta":
dag_dict["timetable"] = _create_compat_timetable(datetime.timedelta(seconds=sched["__var"]))
elif sched.get("__type") == "relativedelta":
dag_dict["timetable"] = _create_compat_timetable(decode_relativedelta(sched["__var"]))
else:
# We should maybe convert this to None and warn instead
raise ValueError(f"Unknown schedule_interval field {sched!r}")
if "dag_dependencies" in dag_dict:
for dep in dag_dict["dag_dependencies"]:
dep_type = dep.get("dependency_type")
if dep_type in ("dataset", "dataset-alias"):
dep["dependency_type"] = dep_type.replace("dataset", "asset")
if not dep.get("label"):
dep["label"] = dep["dependency_id"]
for fld in ("target", "source"):
val = dep.get(fld)
if val == dep_type and val in ("dataset", "dataset-alias"):
dep[fld] = dep[fld].replace("dataset", "asset")
elif val.startswith("dataset:"):
dep[fld] = dep[fld].replace("dataset:", "asset:")
elif val.startswith("dataset-alias:"):
dep[fld] = dep[fld].replace("dataset-alias:", "asset-alias:")
for task in dag_dict["tasks"]:
task_var: dict = task["__var"]
if "airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep" in task_var.get("deps", []):
task_var["_is_sensor"] = True
for k in tasks_remove:
task_var.pop(k, None)
for old, new in task_renames:
task_var[new] = task_var.pop(old)
for item in itertools.chain(*(task_var.get(key, []) for key in ("inlets", "outlets"))):
original_item_type = item["__type"]
if isinstance(item, dict) and "__type" in item:
item["__type"] = replace_dataset_in_str(original_item_type)
var_ = item["__var"]
if original_item_type == "dataset":
var_["name"] = var_["uri"]
var_["group"] = "asset"
# Set on the root TG
dag_dict["task_group"]["group_display_name"] = ""