bigquery_etl/glam/cli.py (138 lines of code) (raw):

"""Tools for GLAM ETL.""" import os from pathlib import Path import rich_click as click import yaml from google.cloud import bigquery from .utils import get_schema, run ROOT = Path(__file__).parent.parent.parent def _check_root(): assert ( ROOT / "sql" / "moz-fx-data-shared-prod" ).exists(), f"{ROOT} is not the project root" @click.group() def glam(): """Tools for GLAM ETL.""" pass @glam.group() def glean(): """Tools for Glean in GLAM.""" pass @glean.command() @click.option("--project", default="glam-fenix-dev") @click.option("--dataset", default="glam_etl_dev") def list_daily(project, dataset): """List the start and end dates for clients daily tables.""" _check_root() client = bigquery.Client() app_df = client.query( rf""" WITH extracted AS ( SELECT DISTINCT REGEXP_EXTRACT(table_name, "(.*)__") AS app_id, FROM `{project}`.{dataset}.INFORMATION_SCHEMA.TABLES WHERE table_name LIKE r"%clients\_daily%" ) SELECT app_id, (app_id LIKE r"%\_glam\_%") AS is_logical FROM extracted ORDER BY is_logical, app_id """ ).to_dataframe() query = [] for row in app_df.itertuples(): query += [ f""" SELECT "{row.app_id}" as app_id, {row.is_logical} as is_logical, date(min(submission_date)) as earliest, date(max(submission_date)) as latest FROM `{project}`.{dataset}.{row.app_id}__view_clients_daily_scalar_aggregates_v1 """ ] range_df = ( client.query("\nUNION ALL\n".join(query)) .to_dataframe() .sort_values(["is_logical", "app_id"]) ) click.echo(range_df) @glean.command() @click.argument("app-id", type=str) @click.argument("start-date", type=str) @click.argument("end-date", type=str) @click.option("--dataset", type=str, default="glam_etl_dev") def backfill_daily(app_id, start_date, end_date, dataset): """Backfill the daily tables.""" _check_root() run( "script/glam/generate_glean_sql", cwd=ROOT, env={**os.environ, **dict(PRODUCT=app_id, STAGE="daily")}, ) run( "script/glam/backfill_glean", cwd=ROOT, env={ **os.environ, **dict( DATASET=dataset, PRODUCT=app_id, STAGE="daily", START_DATE=start_date, END_DATE=end_date, RUN_EXPORT="false", ), }, ) @glean.command() @click.argument("app-id", type=str) @click.argument("start-date", type=str) @click.argument("end-date", type=str) @click.option("--dataset", type=str, default="glam_etl_dev") def backfill_incremental(app_id, start_date, end_date, dataset): """Backfill the incremental tables using existing daily tables. To rebuild the table from scratch, drop the clients_scalar_aggregates and clients_histogram_aggregates tables. """ _check_root() run( "script/glam/generate_glean_sql", cwd=ROOT, env={**os.environ, **dict(PRODUCT=app_id, STAGE="incremental")}, ) run( "script/glam/backfill_glean", cwd=ROOT, env={ **os.environ, **dict( DATASET=dataset, PRODUCT=app_id, STAGE="incremental", START_DATE=start_date, END_DATE=end_date, RUN_EXPORT="false", ), }, ) @glean.command() @click.argument("app-id", type=str) @click.option("--project", default="glam-fenix-dev") @click.option("--dataset", type=str, default="glam_etl_dev") @click.option("--bucket", default="glam-fenix-dev-testing") def export(app_id, project, dataset, bucket): """Run the export ETL and write the final csv to a gcs bucket.""" _check_root() run( "script/glam/generate_glean_sql", cwd=ROOT, env={**os.environ, **dict(PRODUCT=app_id, STAGE="incremental")}, ) run( "script/glam/run_glam_sql", cwd=ROOT, env={ **os.environ, **dict( PROJECT=project, DATASET=dataset, PRODUCT=app_id, STAGE="incremental", EXPORT_ONLY="true", ), }, ) run( "script/glam/export_csv", cwd=ROOT, env={ **os.environ, **dict(SRC_PROJECT=project, DATASET=dataset, PRODUCT=app_id, BUCKET=bucket), }, ) @glean.command() @click.option("--project", default="glam-fenix-dev") @click.option("--dataset", default="glam_etl") @click.option("--src-dataset", default="glam_etl_dev") def update_schemas(project, dataset, src_dataset): """Update the schema.yaml files in each query.""" sql_root = ROOT / "sql" / project / dataset for path in sql_root.glob("*"): print(f"fetching schema for {path.name}") # we can update the schema with the development version of the schema schema = dict(fields=get_schema(f"{src_dataset}.{path.name}", project)) with (path / "schema.yaml").open("w") as fp: yaml.dump(schema, fp) if __name__ == "__main__": glam()