def _deploy_artifacts()

in bigquery_etl/cli/stage.py [0:0]


def _deploy_artifacts(ctx, artifact_files, project_id, dataset_suffix, sql_dir):
    """Deploy routines, tables and views."""
    # give read permissions to dry run accounts
    dataset_access_entries = [
        bigquery.AccessEntry(
            role="READER",
            entity_type=EntityTypes.USER_BY_EMAIL,
            entity_id=dry_run_account,
        )
        for dry_run_account in ConfigLoader.get(
            "dry_run", "function_accounts", fallback=[]
        )
    ]

    # deploy routines
    routine_files = [file for file in artifact_files if file.name in ROUTINE_FILES]
    for routine_file in routine_files:
        dataset = routine_file.parent.parent.name
        create_dataset_if_not_exists(
            project_id=project_id,
            dataset=dataset,
            suffix=dataset_suffix,
            access_entries=dataset_access_entries,
        )
    ctx.invoke(publish_routine, name=None, project_id=project_id, dry_run=False)

    # deploy table schemas
    query_files = list(
        {
            file
            for file in artifact_files
            if file.name in [QUERY_FILE, QUERY_SCRIPT]
            # don't attempt to deploy wildcard or metadata tables
            and "*" not in file.parent.name and file.parent.name != "INFORMATION_SCHEMA"
        }
    )

    if len(query_files) > 0:
        # checking and creating datasets needs to happen sequentially
        for query_file in query_files:
            dataset = query_file.parent.parent.name
            create_dataset_if_not_exists(
                project_id=project_id,
                dataset=dataset,
                suffix=dataset_suffix,
                access_entries=dataset_access_entries,
            )

        ctx.invoke(
            update_query_schema,
            name=query_files,
            sql_dir=sql_dir,
            project_id=project_id,
            respect_dryrun_skip=True,
            is_init=True,
        )
        ctx.invoke(
            deploy_query_schema,
            name=query_files,
            sql_dir=sql_dir,
            project_id=project_id,
            force=True,
            respect_dryrun_skip=False,
            skip_external_data=True,
        )

    # deploy views
    view_files = [
        file
        for file in artifact_files
        if file.name == VIEW_FILE and str(file) not in DryRun.skipped_files()
    ]
    for view_file in view_files:
        dataset = view_file.parent.parent.name
        create_dataset_if_not_exists(
            project_id=project_id,
            dataset=dataset,
            suffix=dataset_suffix,
            access_entries=dataset_access_entries,
        )

    ctx.invoke(
        publish_view,
        name=None,
        sql_dir=sql_dir,
        project_id=project_id,
        dry_run=False,
        skip_authorized=False,
        force=True,
        respect_dryrun_skip=True,
    )