def initialize()

in backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py [0:0]


def initialize(ctx, name, sql_dir, project_id, dry_run):
    """Create the destination table for the provided query."""
    if not is_authenticated():
        click.echo("Authentication required for creating tables.", err=True)
        sys.exit(1)

    if Path(name).exists():
        # Allow name to be a path
        query_files = [Path(name)]
    else:
        query_files = paths_matching_name_pattern(name, sql_dir, project_id)

    if not query_files:
        click.echo(
            f"Couldn't find directory matching `{name}`. Failed to initialize query.",
            err=True,
        )
        sys.exit(1)

    for query_file in query_files:
        sql_content = query_file.read_text()
        client = bigquery.Client()

        # Enable initialization from query.sql files
        # Create the table by deploying the schema and metadata, then run the init.
        # This does not currently verify the accuracy of the schema or that it
        # matches the query.
        if "is_init()" in sql_content:
            project = query_file.parent.parent.parent.name
            dataset = query_file.parent.parent.name
            destination_table = query_file.parent.name
            full_table_id = f"{project}.{dataset}.{destination_table}"

            try:
                table = client.get_table(full_table_id)
            except NotFound:
                table = bigquery.Table(full_table_id)

            if table.created:
                raise PreconditionFailed(
                    f"Table {full_table_id} already exists. The initialization process is terminated."
                )
            ctx.invoke(deploy, name=full_table_id, force=True)

            arguments = [
                "query",
                "--use_legacy_sql=false",
                "--replace",
                "--format=none",
            ]
            if dry_run:
                arguments += ["--dry_run"]

            if "parallel_run" in sql_content:
                _initialize_in_parallel(
                    project=project,
                    table=destination_table,
                    dataset=dataset,
                    query_file=query_file,
                    arguments=arguments,
                    parallelism=DEFAULT_PARALLELISM,
                    addl_templates={
                        "is_init": lambda: True,
                        "parallel_run": lambda: True,
                    },
                )
            else:
                _run_query(
                    project_id=project,
                    public_project_id=None,
                    destination_table=destination_table,
                    dataset_id=dataset,
                    query_arguments=arguments,
                    addl_templates={
                        "is_init": lambda: True,
                    },
                )
        else:
            init_files = Path(query_file.parent).rglob("init.sql")

            for init_file in init_files:
                project = init_file.parent.parent.parent.name

                with open(init_file) as init_file_stream:
                    init_sql = init_file_stream.read()
                    dataset = Path(init_file).parent.parent.name
                    destination_table = query_file.parent.name
                    job_config = bigquery.QueryJobConfig(
                        dry_run=dry_run,
                        default_dataset=f"{project}.{dataset}",
                        destination=f"{project}.{dataset}.{destination_table}",
                    )

                    if "CREATE MATERIALIZED VIEW" in init_sql:
                        click.echo(f"Create materialized view for {init_file}")
                        # existing materialized view have to be deleted before re-creation
                        view_name = query_file.parent.name
                        client.delete_table(
                            f"{project}.{dataset}.{view_name}", not_found_ok=True
                        )
                    else:
                        click.echo(f"Create destination table for {init_file}")

                    job = client.query(init_sql, job_config=job_config)

                    if not dry_run:
                        job.result()