def _generate_view_schema()

in sql_generators/derived_view_schemas/__init__.py [0:0]


def _generate_view_schema(sql_dir, view_directory, id_token=None):
    import logging

    from bigquery_etl.dependency import extract_table_references
    from bigquery_etl.metadata.parse_metadata import Metadata
    from bigquery_etl.schema import Schema
    from bigquery_etl.util.common import render
    from bigquery_etl.view import View

    logging.basicConfig(format="%(levelname)s (%(filename)s:%(lineno)d) - %(message)s")

    # If the view references only one table, we can:
    # 1. Get the reference table partition column if it exists.
    #   (to dry run views to partitioned tables).
    # 2. Get the reference table schema and use it to enrich the
    #   view schema we get from dry-running.
    def _get_reference_dir_path(view_file):
        view_references = extract_table_references(
            render(view_file.name, view_file.parent)
        )
        if len(view_references) != 1:
            return

        target_project = view_file.parent.parent.parent.name
        target_dataset = view_file.parent.parent.name

        target_reference = view_references[0]
        parts = target_reference.split(".")
        if len(parts) == 3:
            reference_project_id, reference_dataset_id, reference_table_id = parts
        # Fully qualify the reference:
        elif len(parts) == 2:
            reference_project_id = target_project
            reference_dataset_id, reference_table_id = parts
        elif len(parts) == 1:
            reference_project_id = target_project
            reference_dataset_id = target_dataset
            reference_table_id = parts[0]
        else:
            return

        return (
            sql_dir / reference_project_id / reference_dataset_id / reference_table_id
        )

    def _get_reference_partition_column(ref_path):
        if ref_path is None:
            logging.debug("No table reference, skipping partition column.")
            return

        try:
            reference_metadata = Metadata.from_file(ref_path / METADATA_FILE)
        except Exception as metadata_exception:
            logging.warning(f"Unable to get reference metadata: {metadata_exception}")
            return

        bigquery_metadata = reference_metadata.bigquery
        if bigquery_metadata is None:
            logging.warning(
                f"No bigquery metadata at {ref_path}, unable to get partition column."
            )
            return

        partition_metadata = bigquery_metadata.time_partitioning
        if partition_metadata is None:
            logging.warning(
                f"No partition metadata at {ref_path}, unable to get partition column."
            )
            return

        return partition_metadata.field

    view_file = view_directory / VIEW_FILE
    if not view_file.exists():
        return

    reference_path = _get_reference_dir_path(view_file)

    # If this is a view to a stable table, don't try to write the schema:
    if reference_path is not None:
        reference_dataset = reference_path.parent.name
        if reference_dataset.endswith("_stable"):
            return

    # Optionally get the upstream partition column
    reference_partition_column = _get_reference_partition_column(reference_path)
    if reference_partition_column is None:
        logging.debug("No reference partition column, dry running without one.")

    view = View.from_file(
        view_file, partition_column=reference_partition_column, id_token=id_token
    )

    # `View.schema` prioritizes the configured schema over the dryrun schema, but here
    # we prioritize the dryrun schema because the `schema.yaml` file might be out of date.
    schema = view.dryrun_schema or view.configured_schema
    if view.dryrun_schema and view.configured_schema:
        try:
            schema.merge(
                view.configured_schema,
                attributes=["description"],
                add_missing_fields=False,
                ignore_missing_fields=True,
            )
        except Exception as e:
            logging.warning(
                f"Error enriching {view.view_identifier} view schema from {view.schema_path}: {e}"
            )
    if not schema:
        logging.warning(
            f"Couldn't get schema for {view.view_identifier} potentially "
            f"due to dry-run error. Won't write yaml."
        )
        return

    # Optionally enrich the view schema if we have a valid table reference
    if reference_path:
        reference_schema_file = reference_path / SCHEMA_FILE
        if reference_schema_file.exists():
            try:
                reference_schema = Schema.from_schema_file(reference_schema_file)
                schema.merge(
                    reference_schema,
                    attributes=["description"],
                    add_missing_fields=False,
                    ignore_missing_fields=True,
                )
            except Exception as e:
                logging.warning(
                    f"Error enriching {view.view_identifier} view schema from {reference_schema_file}: {e}"
                )

    schema.to_yaml_file(view_directory / SCHEMA_FILE)