bigquery_etl/view/__init__.py (335 lines of code) (raw):

"""Represents a SQL view.""" import glob import re import string import sys import time from functools import cached_property from pathlib import Path from textwrap import dedent from typing import Any, Optional import attr import sqlparse from google.api_core.exceptions import BadRequest, NotFound from google.cloud import bigquery from bigquery_etl.config import ConfigLoader from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.metadata.parse_metadata import ( DATASET_METADATA_FILE, METADATA_FILE, DatasetMetadata, Metadata, ) from bigquery_etl.schema import SCHEMA_FILE, Schema from bigquery_etl.util import extract_from_query_path from bigquery_etl.util.common import render # Regex matching CREATE VIEW statement so it can be removed to get the view query CREATE_VIEW_PATTERN = re.compile( r"CREATE\s+OR\s+REPLACE\s+VIEW\s+[^\s]+\s+AS", re.IGNORECASE ) @attr.s(auto_attribs=True) class View: """Representation of a SQL view stored in a view.sql file.""" path: str = attr.ib() name: str = attr.ib() dataset: str = attr.ib() project: str = attr.ib() partition_column: Optional[str] = attr.ib(None) id_token: Optional[Any] = attr.ib(None) @path.validator def validate_path(self, attribute, value): """Check that the view path is valid.""" if not Path(self.path).exists(): raise ValueError(f"View file does not exist: {self.path}") @property def content(self): """Return the view SQL.""" path = Path(self.path) return render(path.name, template_folder=path.parent) @classmethod def from_file(cls, path, **kwargs): """View from SQL file.""" project, dataset, name = extract_from_query_path(path) return cls( path=str(path), name=name, dataset=dataset, project=project, **kwargs ) @property def view_identifier(self): """Return full view identifier: `<project>.<dataset>.<name>`.""" return f"{self.project}.{self.dataset}.{self.name}" @property def is_user_facing(self): """Return whether the view is user-facing.""" return not self.dataset.endswith( tuple( ConfigLoader.get( "default", "non_user_facing_dataset_suffixes", fallback=[] ) ) ) @cached_property def metadata(self): """Return the view metadata.""" path = Path(self.path).parent / METADATA_FILE if not path.exists(): return None return Metadata.from_file(path) @property def labels(self): """Return the view labels.""" if not hasattr(self, "_labels"): if self.metadata: self._labels = self.metadata.labels.copy() else: self._labels = {} return self._labels @classmethod def create(cls, project, dataset, name, sql_dir, base_table=None): """ Create a new empty view from a template. Use `base_table` in view definition, if provided. """ path = Path(sql_dir) / project / dataset / name / "view.sql" dataset_path = path.parent.parent if not dataset_path.exists(): # create new dataset with dataset metadata path.parent.mkdir(parents=True) dataset_metadata = DatasetMetadata( friendly_name=string.capwords(dataset), description="Please provide a dataset description.", dataset_base_acl="view", user_facing=True, ) dataset_metadata.write(dataset_path / DATASET_METADATA_FILE) else: path.parent.mkdir(parents=True, exist_ok=True) if not base_table: base_table = f"{project}.{dataset}_derived.{name}_v1" path.write_text( reformat( f""" CREATE OR REPLACE VIEW `{project}.{dataset}.{name}` AS SELECT * FROM `{base_table}` """ ) + "\n" ) return cls(path, name, dataset, project) def skip_validation(self): """Get views that should be skipped during validation.""" return { file for skip in ConfigLoader.get("view", "validation", "skip", fallback=[]) for file in glob.glob( skip, recursive=True, ) } def skip_publish(self): """Get views that should be skipped during publishing.""" return { file for skip in ConfigLoader.get("view", "publish", "skip", fallback=[]) for file in glob.glob( skip, recursive=True, ) } def is_valid(self) -> bool: """Validate the SQL view definition.""" if any(str(self.path).endswith(p) for p in self.skip_validation()): print(f"Skipped validation for {self.path}") return True return self._valid_fully_qualified_references() and self._valid_view_naming() @cached_property def table_references(self): """List of table references in this view.""" from bigquery_etl.dependency import extract_table_references return extract_table_references(self.content) @cached_property def udf_references(self): """List of UDF references in this view.""" from bigquery_etl.routine.parse_routine import routine_usages_in_text # routine_usages_in_text automatically includes mozfun UDFs return routine_usages_in_text( self.content, Path(self.path).parent.parent.parent ) @cached_property def schema(self): """Derive view schema from a schema file or a dry run result.""" return self.configured_schema or self.dryrun_schema @cached_property def schema_path(self): """Return the schema file path.""" return Path(self.path).parent / SCHEMA_FILE @cached_property def configured_schema(self): """Derive view schema from a schema file.""" if self.schema_path.is_file(): return Schema.from_schema_file(self.schema_path) return None @cached_property def dryrun_schema(self): """Derive view schema from a dry run result.""" try: # We have to remove `CREATE OR REPLACE VIEW ... AS` from the query to avoid # view-creation-permission-denied errors, and we have to apply a `WHERE` # filter to avoid partition-column-filter-missing errors. schema_query_filter = ( f"DATE(`{self.partition_column}`) = DATE('2020-01-01')" if self.partition_column else "FALSE" ) schema_query = dedent( f""" WITH view_query AS ( {CREATE_VIEW_PATTERN.sub("", self.content)} ) SELECT * FROM view_query WHERE {schema_query_filter} """ ) return Schema.from_query_file( Path(self.path), content=schema_query, id_token=self.id_token ) except Exception as e: print(f"Error dry-running view {self.view_identifier} to get schema: {e}") return None def _valid_fully_qualified_references(self): """Check that referenced tables and views are fully qualified.""" for table in self.table_references: if len(table.split(".")) < 3: print(f"{self.path} ERROR\n{table} missing project_id qualifier") return False return True def _valid_view_naming(self): """Validate that the created view naming matches the directory structure.""" if not (parsed := sqlparse.parse(self.content)): raise ValueError(f"Unable to parse view SQL for {self.path}") tokens = [ t for t in parsed[0].tokens if not (t.is_whitespace or isinstance(t, sqlparse.sql.Comment)) ] is_view_statement = ( " ".join(tokens[0].normalized.split()) == "CREATE OR REPLACE" and tokens[1].normalized == "VIEW" ) if is_view_statement: target_view = str(tokens[2]).strip().split()[0] try: [project_id, dataset_id, view_id] = target_view.replace("`", "").split( "." ) if not ( self.name == view_id and self.dataset == dataset_id and self.project == project_id ): print( f"{self.path} ERROR\n" f"View name {target_view} not matching directory structure." ) return False except Exception: print(f"{self.path} ERROR\n{target_view} missing project ID qualifier.") return False else: print( f"ERROR: {self.path} does not appear to be " "a CREATE OR REPLACE VIEW statement! Quitting..." ) return False return True def target_view_identifier(self, target_project=None): """Return the view identifier after replacing project with target_project. Result must be a fully-qualified BigQuery Standard SQL table identifier, which is of the form f"{project_id}.{dataset_id}.{table_id}". dataset_id and table_id may not contain "." or "`". Each component may be a backtick (`) quoted identifier, or the whole thing may be a backtick quoted identifier, but not both. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash. For more information see also https://github.com/mozilla/bigquery-etl/pull/1427#issuecomment-707376291 """ if target_project: return self.view_identifier.replace(self.project, target_project, 1) return self.view_identifier def has_changes(self, target_project=None, credentials=None): """Determine whether there are any changes that would be published.""" if any(str(self.path).endswith(p) for p in self.skip_publish()): return False if target_project and self.project != ConfigLoader.get( "default", "project", fallback="moz-fx-data-shared-prod" ): # view would be skipped because --target-project is set return False if credentials: client = bigquery.Client(credentials=credentials) else: client = bigquery.Client() target_view_id = self.target_view_identifier(target_project) try: table = client.get_table(target_view_id) except NotFound: print(f"view {target_view_id} will change: does not exist in BigQuery") return True try: expected_view_query = CREATE_VIEW_PATTERN.sub( "", sqlparse.format(self.content, strip_comments=True), count=1 ).strip(";" + string.whitespace) actual_view_query = sqlparse.format( table.view_query, strip_comments=True ).strip(";" + string.whitespace) except TypeError: print( f"ERROR: There has been an issue formatting: {target_view_id}", file=sys.stderr, ) raise if expected_view_query != actual_view_query: print(f"view {target_view_id} will change: query does not match") return True # check metadata if self.metadata is not None: if self.metadata.description != table.description: print(f"view {target_view_id} will change: description does not match") return True if self.metadata.friendly_name != table.friendly_name: print( f"view {target_view_id} will change: friendly_name does not match" ) return True if self.labels != table.labels: print(f"view {target_view_id} will change: labels do not match") return True table_schema = Schema.from_bigquery_schema(table.schema) if self.schema is not None and not self.schema.equal(table_schema): print(f"view {target_view_id} will change: schema does not match") return True return False def publish(self, target_project=None, dry_run=False, client=None): """ Publish this view to BigQuery. If `target_project` is set, it will replace the project ID in the view definition. """ if any(str(self.path).endswith(p) for p in self.skip_publish()): print(f"Skipping {self.path}") return True # avoid checking references since Jenkins might throw an exception: # https://github.com/mozilla/bigquery-etl/issues/2246 if ( any(str(self.path).endswith(p) for p in self.skip_validation()) or self._valid_view_naming() ): client = client or bigquery.Client() sql = self.content target_view = self.target_view_identifier(target_project) if target_project: if self.project != ConfigLoader.get( "default", "project", fallback="moz-fx-data-shared-prod" ): print(f"Skipping {self.path} because --target-project is set") return True # We only change the first occurrence, which is in the target view name. sql = re.sub( rf"^(?!--)(.*){self.project}", rf"\1{target_project}", sql, count=1, flags=re.MULTILINE, ) job_config = bigquery.QueryJobConfig(use_legacy_sql=False, dry_run=dry_run) query_job = client.query(sql, job_config) if dry_run: print(f"Validated definition of {target_view} in {self.path}") else: try: job_id = query_job.result().job_id except BadRequest as e: if "Invalid snapshot time" in e.message: # This occasionally happens due to dependent views being # published concurrently; we wait briefly and give it one # extra try in this situation. time.sleep(1) job_id = client.query(sql, job_config).result().job_id else: raise try: table = client.get_table(target_view) except NotFound: print( f"{target_view} failed to publish to the correct location, verify job id {job_id}", file=sys.stderr, ) return False try: if self.schema_path.is_file(): table = self.schema.deploy(target_view) except Exception as e: print(f"Could not update field descriptions for {target_view}: {e}") if not self.metadata: print(f"Missing metadata for {self.path}") table.description = self.metadata.description table.friendly_name = self.metadata.friendly_name if table.labels != self.labels: labels = self.labels.copy() for key in table.labels: if key not in labels: # To delete a label its value must be set to None labels[key] = None table.labels = { key: value for key, value in labels.items() if isinstance(value, str) } client.update_table( table, ["labels", "description", "friendly_name"] ) else: client.update_table(table, ["description", "friendly_name"]) print(f"Published view {target_view}") else: print( f"Error publishing {self.path}. Invalid view definition.", file=sys.stderr, ) return False return True