def conversion_v1_to_v2()

in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]


    def conversion_v1_to_v2(ser_obj: dict):
        dag_dict = ser_obj["dag"]
        dag_renames = [
            ("_dag_id", "dag_id"),
            ("_task_group", "task_group"),
            ("_access_control", "access_control"),
        ]
        task_renames = [("_task_type", "task_type")]
        #
        tasks_remove = [
            "_log_config_logger_name",
            "deps",
            "sla",
            # Operator extra links from Airflow 2 won't work anymore, only new ones, so remove these
            "_operator_extra_links",
        ]

        ser_obj["__version"] = 2

        def replace_dataset_in_str(s):
            return s.replace("Dataset", "Asset").replace("dataset", "asset")

        def _replace_dataset_with_asset_in_timetables(obj, parent_key=None):
            if isinstance(obj, dict):
                new_obj = {}
                for k, v in obj.items():
                    new_key = replace_dataset_in_str(k) if isinstance(k, str) else k
                    # Don't replace uri values
                    if new_key == "uri":
                        new_obj[new_key] = v
                    else:
                        new_value = (
                            replace_dataset_in_str(v)
                            if isinstance(v, str)
                            else _replace_dataset_with_asset_in_timetables(v, parent_key=new_key)
                        )
                        new_obj[new_key] = new_value
                # Insert "name" and "group" if this is inside the 'objects' list
                if parent_key == "objects":
                    new_obj["name"] = None
                    new_obj["group"] = None
                return new_obj

            elif isinstance(obj, list):
                return [_replace_dataset_with_asset_in_timetables(i, parent_key=parent_key) for i in obj]

            return obj

        def _create_compat_timetable(value):
            from airflow import settings
            from airflow.sdk.definitions.dag import _create_timetable

            if tzs := dag_dict.get("timezone"):
                timezone = decode_timezone(tzs)
            else:
                timezone = settings.TIMEZONE
            timetable = _create_timetable(value, timezone)
            return encode_timetable(timetable)

        for old, new in dag_renames:
            if old in dag_dict:
                dag_dict[new] = dag_dict.pop(old)

        if default_args := dag_dict.get("default_args"):
            for k in tasks_remove:
                default_args["__var"].pop(k, None)

        if timetable := dag_dict.get("timetable"):
            if timetable["__type"] in {
                "airflow.timetables.simple.DatasetTriggeredTimetable",
                "airflow.timetables.datasets.DatasetOrTimeSchedule",
            }:
                dag_dict["timetable"] = _replace_dataset_with_asset_in_timetables(dag_dict["timetable"])
        elif (sched := dag_dict.pop("schedule_interval", None)) is None:
            dag_dict["timetable"] = _create_compat_timetable(None)
        elif isinstance(sched, str):
            dag_dict["timetable"] = _create_compat_timetable(sched)
        elif sched.get("__type") == "timedelta":
            dag_dict["timetable"] = _create_compat_timetable(datetime.timedelta(seconds=sched["__var"]))
        elif sched.get("__type") == "relativedelta":
            dag_dict["timetable"] = _create_compat_timetable(decode_relativedelta(sched["__var"]))
        else:
            # We should maybe convert this to None and warn instead
            raise ValueError(f"Unknown schedule_interval field {sched!r}")

        if "dag_dependencies" in dag_dict:
            for dep in dag_dict["dag_dependencies"]:
                dep_type = dep.get("dependency_type")
                if dep_type in ("dataset", "dataset-alias"):
                    dep["dependency_type"] = dep_type.replace("dataset", "asset")

                if not dep.get("label"):
                    dep["label"] = dep["dependency_id"]

                for fld in ("target", "source"):
                    val = dep.get(fld)
                    if val == dep_type and val in ("dataset", "dataset-alias"):
                        dep[fld] = dep[fld].replace("dataset", "asset")
                    elif val.startswith("dataset:"):
                        dep[fld] = dep[fld].replace("dataset:", "asset:")
                    elif val.startswith("dataset-alias:"):
                        dep[fld] = dep[fld].replace("dataset-alias:", "asset-alias:")

        for task in dag_dict["tasks"]:
            task_var: dict = task["__var"]
            if "airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep" in task_var.get("deps", []):
                task_var["_is_sensor"] = True
            for k in tasks_remove:
                task_var.pop(k, None)
            for old, new in task_renames:
                task_var[new] = task_var.pop(old)
            for item in itertools.chain(*(task_var.get(key, []) for key in ("inlets", "outlets"))):
                original_item_type = item["__type"]
                if isinstance(item, dict) and "__type" in item:
                    item["__type"] = replace_dataset_in_str(original_item_type)

                var_ = item["__var"]
                if original_item_type == "dataset":
                    var_["name"] = var_["uri"]
                var_["group"] = "asset"

        # Set on the root TG
        dag_dict["task_group"]["group_display_name"] = ""