jobs/kpi-forecasting/kpi_forecasting/results_processing.py (199 lines of code) (raw):
from dataclasses import dataclass
from functools import partial
from google.cloud import bigquery
from google.cloud.bigquery.enums import SqlTypeNames as bq_types
from kpi_forecasting.inputs import load_yaml
import pandas as pd
import numpy as np
@dataclass
class PerformanceAnalysis:
input_config_list: list[str]
output_project: str
output_dataset: str
output_table: str
input_config_path: str = "kpi_forecasting/configs"
intra_forecast_agg_names: tuple = (
"max",
"min",
"median",
"mean",
"percentile_25",
"percentile_75",
)
identifier_columns: tuple = (
"submission_date",
"metric_alias",
"aggregation_period",
)
intra_forecast_lookback_months: int = 12 * 100 # will revisit in the year 2123
def __post_init__(self) -> None:
"""After initalization, set the following outputs after initialization
output_table_id: id for output table
job_config: used to set the output table when writing with bigquery
config_data: a dict of all the data from the list of configs provided
set inside of _load_config_data
input_table_full: id of the input data table extracted from the configs
dimension_list: indicates which columns of the input table represent
dimensions, where different combinations of values specify separate forecasts
If a forecast has no such columns, set to an empty list
"""
# table is small, so every time this runs it processes all the data
# and overwrites the old table
self._load_config_data()
self._extract_config_data()
self._set_intra_forecast_agg_functions()
self.output_table_id = (
f"{self.output_project}.{self.output_dataset}.{self.output_table}"
)
if self.output_project:
# this case makes it possible to create
# an object without any bigquery setup
# for testing
self.client = bigquery.Client(project=self.output_project)
def _set_intra_forecast_agg_functions(self):
"""parses function names from the config into functions where
applicable and sets the result to intra_forecast_agg_names.
Currently only applies to percentile, where the value following
the underscore is the percentile to apply"""
self.intra_forecast_agg_functions = [
partial(np.percentile, q=int(el.split("_")[1]))
if isinstance(el, str) and "percentile" in el
else el
for el in self.intra_forecast_agg_names
]
def _load_config_data(self):
"""Extracts data from the list of config files passed to the class and stores it in the
config_data attribute. The filename is the key, and the contents
are the values"""
self.config_data = {}
for config_file in self.input_config_list:
full_path = f"{self.input_config_path}/{config_file}"
config_data = load_yaml(full_path)
self.config_data[config_file] = config_data
def _extract_config_data(self):
"""Extracts data from the dictionary created by _load_config_data and uses it to set
the attributes below:
input_table_full: id of the input data table extracted from the configs
dimension_list: indicates which columns of the input table represent
dimensions, where different combinations of values specify separate forecasts
If a forecast has no such columns, set to an empty list
Raises:
Exception: Raised if list of config files have different values for the dimension list
Exception: Raised if list of config files have different values for the input table
"""
segment_data_list = []
input_table_list = []
config_file_list = list(self.config_data.keys())
for config_data in self.config_data.values():
# get segment data
metric_hub_data = config_data["metric_hub"]
if "segments" in metric_hub_data:
segment_data = metric_hub_data["segments"]
segment_data_list.append(segment_data)
else:
segment_data_list.append(None)
# get input table info
input_table_list.append(config_data["write_results"])
input_table_data = input_table_list.pop(0)
input_table_matches_first = [input_table_data == el for el in input_table_list]
if not all(input_table_matches_first):
config_file_list_string = " ".join(config_file_list)
raise Exception(
f"Input Table Data Does not all match for config list: {config_file_list_string}"
)
input_project = input_table_data["project"]
input_dataset = input_table_data["dataset"]
input_table = input_table_data["table"]
input_table_full = f"{input_project}.{input_dataset}.{input_table}"
segment_data = segment_data_list.pop(0)
segment_data_matches_first = [segment_data == el for el in segment_data_list]
if not all(segment_data_matches_first):
config_file_list_string = " ".join(config_file_list)
raise Exception(
f"Dimension Data Does not all match for config list: {config_file_list_string}"
)
if segment_data:
# this is the case where dimensions are present
# we only need the column names for the query
dimension_list = list(segment_data.keys())
else:
dimension_list = []
self.input_table_full = input_table_full
self.dimension_list = dimension_list
if len(self.dimension_list) > 0:
self.identifier_columns = (*self.identifier_columns, *self.dimension_list)
# need identifier columns to be a list to make it easy to do pandas operations later
self.identifier_columns = list(self.identifier_columns)
def _get_most_recent_forecasts(self, month_level_df: pd.DataFrame) -> pd.DataFrame:
"""Adds the following columns to month_level_df:
- previous_forecast_month (timestamp):
Timestamp of the first day of the month corresponding to the current forecast
- forecast_value_previous_month (float): forecast value for the previous month
Args:
month_level_df (pd.DataFrame): Dataframe to process. Must have the following columns
in addition to those listed in self.identifier_columns:
- forecast_trained_at_month
- forecast_value
Returns:
pd.DataFrame: DataFrame with new columns added. Has the same number of rows as input
"""
current_forecast_month_df = (
month_level_df[self.identifier_columns + ["forecast_trained_at_month"]]
.groupby(self.identifier_columns)
.agg(current_forecast_month=("forecast_trained_at_month", "max"))
.reset_index()
)
month_level_df = month_level_df.merge(
current_forecast_month_df, on=self.identifier_columns
)
exclude_current_forecast_month = month_level_df[
month_level_df["forecast_trained_at_month"]
!= month_level_df["current_forecast_month"]
]
previous_forecast_month_df = (
exclude_current_forecast_month[
self.identifier_columns + ["forecast_trained_at_month"]
]
.groupby(self.identifier_columns)
.agg(previous_forecast_month=("forecast_trained_at_month", "max"))
.reset_index()
)
month_level_df = month_level_df.merge(
previous_forecast_month_df, on=self.identifier_columns
)
month_level_df = month_level_df.merge(
month_level_df[
self.identifier_columns
+ ["forecast_trained_at_month", "forecast_value"]
],
left_on=self.identifier_columns + ["previous_forecast_month"],
right_on=self.identifier_columns + ["forecast_trained_at_month"],
suffixes=(None, "_previous_month"),
).drop(columns="forecast_trained_at_month_previous_month")
return month_level_df
def query_ctes(self) -> str:
"""Creates the following ctes:
forecast_with_train_month: Adds forecast_trained_at_month column and filters to forecast rows
forecast_month_level: Creates a table with one forecast prediction per month for
each combination of values in self.identifier_columns. It is possible for the forecast to have run multiple times
in a month so to dedupe the most recent forecast is chosen
forecast_deduped: Creates a table including only forecast rows and grouping by self.identifier_columns.
The most recent forecast is chosen for the deduping
actual_deduped: Creates a table including only historical (actual) rows and grouping by self.identifier_columns.
compare_data: joins forecast_deduped and actual_deduped and calculates metrics quantifiying the difference between them
Returns:
(str): Query to generate forecast performance table"""
identifiers_comma_separated = ",".join(self.identifier_columns)
# in actual_deduped, the value for historical data won't change so we can use any_value without checking forecast_trained_at
query_ctes = f"""WITH forecast_with_train_month as (SELECT {identifiers_comma_separated}, forecast_trained_at, value,
DATE_TRUNC(forecast_trained_at, MONTH) as forecast_trained_at_month
FROM {self.input_table_full}
WHERE source='forecast'),
forecast_month_level as (SELECT {identifiers_comma_separated}, forecast_trained_at_month,
MAX(forecast_trained_at) as forecast_trained_at,
ANY_VALUE(value HAVING MAX forecast_trained_at) as forecast_value,
FROM forecast_with_train_month
GROUP BY {identifiers_comma_separated}, forecast_trained_at_month),
forecast_deduped as (SELECT {identifiers_comma_separated},
MAX(forecast_trained_at) as forecast_trained_at,
ANY_VALUE(value HAVING MAX forecast_trained_at) as forecast_value,
FROM {self.input_table_full}
WHERE source='forecast'
GROUP BY {identifiers_comma_separated}),
actual_deduped as (SELECT {identifiers_comma_separated},
ANY_VALUE(value) as actual_value
FROM {self.input_table_full}
WHERE source='historical'
GROUP BY {identifiers_comma_separated}),
compare_data as (SELECT forecast_deduped.*, actual_deduped.actual_value,
(actual_deduped.actual_value-forecast_deduped.forecast_value) as difference,
(actual_deduped.actual_value-forecast_deduped.forecast_value)/actual_deduped.actual_value*100 as percent_difference,
ABS(actual_deduped.actual_value-forecast_deduped.forecast_value) as absolute_difference,
ABS(actual_deduped.actual_value-forecast_deduped.forecast_value)/actual_deduped.actual_value*100 as absolute_percent_difference
FROM forecast_deduped
INNER JOIN actual_deduped USING ({identifiers_comma_separated}))"""
return query_ctes
def _apply_lookback(self, data_df: pd.DataFrame) -> pd.DataFrame:
"""Filters out data that occurs self.intra_forecast_lookback_months months
before current_forecast_month
Args:
data_df (pd.DataFrame): input data frame. Must have the following columns:
- current_forecast_month (timestamp)
- forecast_trained_at_month (timestamp)
Returns:
pd.DataFrame: Filtered dataframe. Will have less than or equal to
the number of rows of the input dataframe
"""
lookback_mindate = data_df["current_forecast_month"] - pd.DateOffset(
months=self.intra_forecast_lookback_months
)
lookback_indicator = data_df["forecast_trained_at_month"] >= lookback_mindate
month_level_lookback_applied = data_df[lookback_indicator]
return month_level_lookback_applied
def fetch(self) -> pd.DataFrame:
"""Uses the query produced by the query method to retrieve data
from bigquery and return as a pandas dataframe
Returns:
pd.DataFrame: forecast performance table as a pandas dataframe
"""
cte = self.query_ctes()
month_level_df = self.client.query(
f"{cte} SELECT * FROM forecast_month_level"
).to_dataframe()
compare_df = self.client.query(
f"{cte} SELECT * FROM compare_data"
).to_dataframe()
month_level_df_with_most_recent = self._get_most_recent_forecasts(
month_level_df
)
# create dictionary used for aggregation
zip_intra_forecast_info = zip(
self.intra_forecast_agg_names, self.intra_forecast_agg_functions
)
agg_dict = {
f"intra_forecast_{name}": ("forecast_value", function)
for name, function in zip_intra_forecast_info
}
# these have the same value for all months so just use max
agg_dict["forecast_value_previous_month"] = (
"forecast_value_previous_month",
"max",
)
agg_dict["previous_forecast_month"] = ("previous_forecast_month", "max")
month_level_lookback_applied = self._apply_lookback(
month_level_df_with_most_recent
)
intra_forecast_metrics = (
month_level_lookback_applied.groupby(self.identifier_columns)
.agg(**agg_dict)
.reset_index()
)
return compare_df.merge(intra_forecast_metrics, on=self.identifier_columns)
def _generate_output_bq_schema(
self, output_df: pd.DataFrame
) -> list[bigquery.SchemaField]:
"""Generates a schema from output dataframe used to write it to bigquery
Args:
output_df (pd.DataFrame): Output Dataframe
Raises:
Exception: If there are columns that don't match the exact
types in the function, an exception will be raised
Returns:
list[bigquery.SchemaField]: schema useable by BigQuery
"""
schema = []
for colname, coltype in output_df.dtypes.to_dict().items():
if coltype == "datetime64[ns, UTC]":
schema.append(bigquery.SchemaField(colname, bq_types.TIMESTAMP))
elif coltype == "dbdate":
schema.append(bigquery.SchemaField(colname, bq_types.DATE))
elif coltype == "object":
schema.append(bigquery.SchemaField(colname, bq_types.STRING))
elif coltype == "float":
schema.append(bigquery.SchemaField(colname, bq_types.FLOAT))
columns_in_schema = {el.name for el in schema}
if columns_in_schema != set(output_df.columns):
missing_columns = ",".join(list(set(output_df.columns) - columns_in_schema))
raise Exception(
f"Schema is missing the following columns due to unexpected type: {missing_columns}"
)
def write(self):
"""Write output of query output by query method to the location specified
by the job_config attribute"""
output_df = self.fetch()
schema = self._generate_output_bq_schema(output_df)
job = self.client.load_table_from_dataframe(
dataframe=output_df,
destination=self.output_table_id,
job_config=bigquery.LoadJobConfig(
schema=schema,
autodetect=False,
write_disposition="WRITE_TRUNCATE",
),
)
# Wait for the job to complete.
job.result()