def _view_dependencies()

in bigquery_etl/cli/stage.py [0:0]


def _view_dependencies(artifact_files, sql_dir):
    """Determine view dependencies."""
    view_dependencies = set()
    view_dependency_files = [file for file in artifact_files if file.name == VIEW_FILE]
    id_token = get_id_token()
    for dep_file in view_dependency_files:
        # all references views and tables need to be deployed in the same stage project
        if dep_file not in artifact_files:
            view_dependencies.add(dep_file)

        if dep_file.name == VIEW_FILE:
            view = View.from_file(dep_file, id_token=id_token)

            for dependency in view.table_references:
                dependency_components = dependency.split(".")
                if dependency_components[1:2] == ["INFORMATION_SCHEMA"]:
                    dependency_components.insert(0, view.project)
                if dependency_components[2:3] == ["INFORMATION_SCHEMA"]:
                    # INFORMATION_SCHEMA has more components that will be treated as the table name
                    # no deploys for INFORMATION_SCHEMA will happen later on
                    dependency_components = dependency_components[:2] + [
                        ".".join(dependency_components[2:])
                    ]
                if len(dependency_components) != 3:
                    raise ValueError(
                        f"Invalid table reference {dependency} in view {view.name}. "
                        "Tables should be fully qualified, expected format: project.dataset.table."
                    )
                project, dataset, name = dependency_components

                file_path = Path(view.path).parent.parent.parent / dataset / name

                file_exists_for_dependency = False
                for file in [VIEW_FILE, QUERY_FILE, QUERY_SCRIPT, MATERIALIZED_VIEW]:
                    if (file_path / file).is_file():
                        if (file_path / file) not in artifact_files:
                            view_dependency_files.append(file_path / file)

                        file_exists_for_dependency = True
                        break

                path = Path(sql_dir) / project / dataset / name
                if not path.exists():
                    path.mkdir(parents=True, exist_ok=True)
                    # don't create schema for wildcard and metadata tables
                    if "*" not in name and name != "INFORMATION_SCHEMA":
                        partitioned_by = "submission_timestamp"
                        schema = Schema.for_table(
                            project=project,
                            dataset=dataset,
                            table=name,
                            id_token=id_token,
                            partitioned_by=partitioned_by,
                        )
                        schema.to_yaml_file(path / SCHEMA_FILE)

                if not file_exists_for_dependency:
                    (path / QUERY_SCRIPT).write_text(
                        "# Table stub generated by stage deploy"
                    )
                    view_dependencies.add(path / QUERY_SCRIPT)

            # extract UDF references from view definition
            raw_routines = read_routine_dir()
            udf_dependencies = set()

            for udf_dependency in view.udf_references:
                routine = raw_routines[udf_dependency]
                udf_dependencies.add(Path(routine.filepath))

            # determine UDF dependencies recursively
            view_dependencies.update(_udf_dependencies(udf_dependencies))
            view_dependencies.update(udf_dependencies)

    return view_dependencies