def _initialize()

in bigquery_etl/cli/query.py [0:0]


    def _initialize(query_file):
        project, dataset, destination_table = extract_from_query_path(query_file)
        client = bigquery.Client(project=project)
        full_table_id = f"{project}.{dataset}.{destination_table}"
        table = None

        sql_content = query_file.read_text()
        materialized_views = list(
            map(
                Path,
                glob(f"{query_file.parent}/**/materialized_view.sql", recursive=True),
            )
        )

        # check if the provided file can be initialized and whether existing ones should be skipped
        if "is_init()" in sql_content:
            try:
                table = client.get_table(full_table_id)
                if skip_existing:
                    # table exists; skip initialization
                    return
                if not force and table.num_rows > 0:
                    raise click.ClickException(
                        f"Table {full_table_id} already exists and contains data. The initialization process is terminated."
                        " Use --force to overwrite the existing destination table."
                    )
            except NotFound:
                # continue with creating the table
                pass
        elif len(materialized_views) == 0:
            return

        try:
            # Enable initialization from query.sql files
            # Create the table by deploying the schema and metadata, then run the init.
            # This does not currently verify the accuracy of the schema or that it
            # matches the query.
            if "is_init()" in sql_content:
                if not table:
                    ctx.invoke(
                        update,
                        name=full_table_id,
                        sql_dir=sql_dir,
                        project_id=project,
                        update_downstream=False,
                        is_init=True,
                    )

                    ctx.invoke(
                        deploy,
                        name=full_table_id,
                        sql_dir=sql_dir,
                        project_id=project,
                        force=True,
                        respect_dryrun_skip=False,
                    )

                arguments = [
                    "query",
                    "--use_legacy_sql=false",
                    "--format=none",
                    "--append_table",
                    "--noreplace",
                ]
                if dry_run:
                    arguments += ["--dry_run"]

                if "@sample_id" in sql_content:
                    sample_ids = list(range(0, 100))

                    _initialize_in_parallel(
                        project=project,
                        table=full_table_id,
                        dataset=dataset,
                        query_file=query_file,
                        arguments=arguments,
                        parallelism=parallelism,
                        sample_ids=sample_ids,
                        addl_templates={
                            "is_init": lambda: True,
                        },
                        billing_project=billing_project,
                    )
                else:
                    _run_query(
                        query_files=[query_file],
                        project_id=project,
                        public_project_id=None,
                        destination_table=full_table_id,
                        dataset_id=dataset,
                        query_arguments=arguments,
                        addl_templates={
                            "is_init": lambda: True,
                        },
                        billing_project=billing_project,
                    )
            else:
                for file in materialized_views:
                    with open(file) as init_file_stream:
                        init_sql = init_file_stream.read()
                        job_config = bigquery.QueryJobConfig(
                            dry_run=dry_run,
                            default_dataset=f"{project}.{dataset}",
                        )

                        # only deploy materialized view if it doesn't exist
                        # TODO: https://github.com/mozilla/bigquery-etl/issues/5804
                        try:
                            materialized_view_table = client.get_table(full_table_id)

                            # Best-effort check, don't fail if there's an error
                            try:
                                has_changes = materialized_view_has_changes(
                                    materialized_view_table.mview_query, init_sql
                                )
                            except Exception as e:
                                change_str = f"failed to compare changes: {e}"
                            else:
                                change_str = (
                                    "sql changed" if has_changes else "sql not changed"
                                )
                            click.echo(
                                f"Skipping materialized view {full_table_id}, already exists, {change_str}"
                            )
                        except NotFound:
                            job = client.query(init_sql, job_config=job_config)

                            if not dry_run:
                                job.result()
        except Exception:
            print_exc()
            return query_file