jobs/kpi-forecasting/kpi_forecasting/metric_hub.py (59 lines of code) (raw):

import pandas as pd from dataclasses import dataclass from google.cloud import bigquery from mozanalysis.config import ConfigLoader from textwrap import dedent from kpi_forecasting.utils import parse_end_date @dataclass class MetricHub: """ Programatically get Metric Hub metrics from Big Query. See https://mozilla.github.io/metric-hub/metrics/ for a list of metrics. Args: app_name (str): The Metric Hub app name for the metric. slug (str): The Metric Hub slug for the metric. start_date (str): A 'YYYY-MM-DD' formatted-string that specifies the first date the metric should be queried. segments (Dict): A dictionary of segments to use to group metric values. The keys of the dictionary are aliases for the segment, and the value is a SQL snippet that defines the segment. where (str): A string specifying a condition to inject into a SQL WHERE clause, to filter the data source. end_date (str): A 'YYYY-MM-DD' formatted-string that specifies the last date the metric should be queried. alias (str): An alias for the metric. For example, 'DAU' instead of 'daily_active_users'. project (str): The Big Query project to use when establishing a connection to the Big Query client. """ app_name: str slug: str start_date: str segments: dict = None where: str = None end_date: str = None alias: str = None project: str = "mozdata" def __post_init__(self) -> None: self.start_date = pd.to_datetime(self.start_date).date() self.end_date = pd.to_datetime(parse_end_date(self.end_date)).date() # Set useful attributes based on the Metric Hub definition metric = ConfigLoader.get_metric( metric_slug=self.slug, app_name=self.app_name, ) self.metric = metric self.alias = self.alias or metric.name self.submission_date_column = metric.data_source.submission_date_column # Modify the metric source table string so that it formats nicely in the query. self.from_expression = self.metric.data_source._from_expr.replace( "\n", "\n" + " " * 19 ) # Add query snippets for segments self.segment_select_query = "" self.segment_groupby_query = "" if self.segments: segment_select_query = [] segments = dict(self.segments) for alias, sql in segments.items(): segment_select_query.append(f" {sql} AS {alias},") self.segment_select_query = "," + "\n ".join( segment_select_query ) self.segment_groupby_query = "," + "\n ,".join( self.segments.keys() ) self.where = f"AND {self.where}" if self.where else "" def query(self) -> str: """Build a string to query the relevant metric values from Big Query.""" return dedent( f""" SELECT {self.submission_date_column} AS submission_date, {self.metric.select_expr} AS value {self.segment_select_query} FROM {self.from_expression} WHERE {self.submission_date_column} BETWEEN '{self.start_date}' AND '{self.end_date}' {self.where} GROUP BY {self.submission_date_column} {self.segment_groupby_query} """ ) def fetch(self) -> pd.DataFrame: """Fetch the relevant metric values from Big Query.""" print( f"\nQuerying for '{self.app_name}.{self.slug}' aliased as '{self.alias}':" f"\n{self.query()}" ) df = bigquery.Client(project=self.project).query(self.query()).to_dataframe() # ensure submission_date has type 'date' df[self.submission_date_column] = pd.to_datetime( df[self.submission_date_column] ).dt.date # Track the min and max dates in the data, which may differ from the # start/end dates self.min_date = str(df[self.submission_date_column].min()) self.max_date = str(df[self.submission_date_column].max()) return df