sql/moz-fx-data-shared-prod/telemetry_dev_cycle_external/experiments_stats_v1/query.py (130 lines of code) (raw):

"""Experiment data downloaded from APIs, clean and upload to BigQuery.""" import datetime import logging from pathlib import Path import click import requests import yaml from google.cloud import bigquery API_BASE_URL_EXPERIMENTS = "https://experimenter.services.mozilla.com" API_BASE_URL_METRIC_HUB = ( "https://api.github.com/repos/mozilla/metric-hub/contents/jetstream" ) DEFAULT_PROJECT_ID = Path(__file__).parent.parent.parent.name DEFAULT_DATASET_ID = Path(__file__).parent.parent.name DEFAULT_TABLE_NAME = Path(__file__).parent.name SCHEMA_FILE = Path(__file__).parent / "schema.yaml" SCHEMA = bigquery.SchemaField.from_api_repr( {"name": "root", "type": "RECORD", **yaml.safe_load(SCHEMA_FILE.read_text())} ).fields def parse_unix_datetime_to_string(unix_string): """Parse unix_string with milliseconds to date string.""" if not unix_string: return None return datetime.datetime.fromtimestamp(int(unix_string) // 1000).strftime( "%Y-%m-%d" ) def get_api_response(url): """Get json of response if the requests to the API was successful.""" response = requests.get(url) response.raise_for_status() return response.json() def store_data_in_bigquery(data, schema, destination_project, destination_table_id): """Upload data to Bigquery in a single, non partitioned table.""" client = bigquery.Client(project=destination_project) job_config = bigquery.LoadJobConfig( create_disposition="CREATE_IF_NEEDED", schema=schema, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, write_disposition="WRITE_TRUNCATE", ) load_job = client.load_table_from_json( data, destination_table_id, location="US", job_config=job_config ) load_job.result() stored_table = client.get_table(destination_table_id) logging.info(f"Loaded {stored_table.num_rows} rows into {destination_table_id}.") def download_experiments_v1(url): """Download experiment data from API v1 and parse it.""" experiments_v1 = [] experiments = get_api_response(f"{url}/api/v1/experiments/") for experiment in experiments: if experiment["status"] == "Draft": continue experiments_v1.append( { "slug": experiment["slug"], "start_date": parse_unix_datetime_to_string(experiment["start_date"]), "enrollment_end_date": None, "end_date": parse_unix_datetime_to_string(experiment["end_date"]), } ) logging.info( f"Downloaded {len(experiments_v1)} records from the experiments v1 API" ) return experiments_v1 def download_experiments_v6(url): """Download experiment data from API v6 and parse it.""" experiments_v6 = [] experiments = get_api_response(f"{url}/api/v6/experiments/") for experiment in experiments: experiments_v6.append( { "slug": experiment["slug"], "start_date": experiment["startDate"], "enrollment_end_date": experiment["enrollmentEndDate"], "end_date": experiment["endDate"], } ) logging.info( f"Downloaded {len(experiments_v6)} records from the experiments v6 API" ) return experiments_v6 def download_metric_hub_files(url): """Download metric hub files from github.""" metric_files = {} files = get_api_response(url) for file in files: if file["type"] != "file": continue slug = file["name"].removesuffix(".toml") metric_files[slug] = True logging.info(f"Downloaded {len(metric_files)} records from the API {url}") return metric_files def compare_experiments_with_metric_hub_configs(): """Download experiments from v1 and v6 API and compare them with config files in metric_hub.""" experiments_v1 = download_experiments_v1(API_BASE_URL_EXPERIMENTS) experiments_v6 = download_experiments_v6(API_BASE_URL_EXPERIMENTS) metric_files = download_metric_hub_files(API_BASE_URL_METRIC_HUB) experiments = [ ( {**experiment, "has_config": True} if experiment["slug"] in metric_files else {**experiment, "has_config": False} ) for experiment in (experiments_v1 + experiments_v6) ] return experiments @click.command @click.option( "--bq_project_id", default=DEFAULT_PROJECT_ID, show_default=True, help="BigQuery project the data is written to.", ) @click.option( "--bq_dataset_id", default=DEFAULT_DATASET_ID, show_default=True, help="BigQuery dataset the data is written to.", ) @click.option( "--bq_table_name", default=DEFAULT_TABLE_NAME, show_default=True, help="Bigquery table the data is written to.", ) def run_experiments_stats(bq_project_id, bq_dataset_id, bq_table_name): """Download experiments from the APIs and store it in BigQuery.""" experiments = compare_experiments_with_metric_hub_configs() destination_table_id = f"{bq_project_id}.{bq_dataset_id}.{bq_table_name}" store_data_in_bigquery( data=experiments, schema=SCHEMA, destination_project=bq_project_id, destination_table_id=destination_table_id, ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) run_experiments_stats()