in bigquery_etl/cli/stage.py [0:0]
def _view_dependencies(artifact_files, sql_dir):
"""Determine view dependencies."""
view_dependencies = set()
view_dependency_files = [file for file in artifact_files if file.name == VIEW_FILE]
id_token = get_id_token()
for dep_file in view_dependency_files:
# all references views and tables need to be deployed in the same stage project
if dep_file not in artifact_files:
view_dependencies.add(dep_file)
if dep_file.name == VIEW_FILE:
view = View.from_file(dep_file, id_token=id_token)
for dependency in view.table_references:
dependency_components = dependency.split(".")
if dependency_components[1:2] == ["INFORMATION_SCHEMA"]:
dependency_components.insert(0, view.project)
if dependency_components[2:3] == ["INFORMATION_SCHEMA"]:
# INFORMATION_SCHEMA has more components that will be treated as the table name
# no deploys for INFORMATION_SCHEMA will happen later on
dependency_components = dependency_components[:2] + [
".".join(dependency_components[2:])
]
if len(dependency_components) != 3:
raise ValueError(
f"Invalid table reference {dependency} in view {view.name}. "
"Tables should be fully qualified, expected format: project.dataset.table."
)
project, dataset, name = dependency_components
file_path = Path(view.path).parent.parent.parent / dataset / name
file_exists_for_dependency = False
for file in [VIEW_FILE, QUERY_FILE, QUERY_SCRIPT, MATERIALIZED_VIEW]:
if (file_path / file).is_file():
if (file_path / file) not in artifact_files:
view_dependency_files.append(file_path / file)
file_exists_for_dependency = True
break
path = Path(sql_dir) / project / dataset / name
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
# don't create schema for wildcard and metadata tables
if "*" not in name and name != "INFORMATION_SCHEMA":
partitioned_by = "submission_timestamp"
schema = Schema.for_table(
project=project,
dataset=dataset,
table=name,
id_token=id_token,
partitioned_by=partitioned_by,
)
schema.to_yaml_file(path / SCHEMA_FILE)
if not file_exists_for_dependency:
(path / QUERY_SCRIPT).write_text(
"# Table stub generated by stage deploy"
)
view_dependencies.add(path / QUERY_SCRIPT)
# extract UDF references from view definition
raw_routines = read_routine_dir()
udf_dependencies = set()
for udf_dependency in view.udf_references:
routine = raw_routines[udf_dependency]
udf_dependencies.add(Path(routine.filepath))
# determine UDF dependencies recursively
view_dependencies.update(_udf_dependencies(udf_dependencies))
view_dependencies.update(udf_dependencies)
return view_dependencies