jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py (483 lines of code) (raw):
from dataclasses import dataclass, field
from datetime import datetime
import itertools
import json
from typing import Dict, List, Union
from google.cloud import bigquery
from google.cloud.bigquery.enums import SqlTypeNames as bq_types
import numpy as np
import pandas as pd
from pandas.api import types as pd_types
import prophet
from prophet.diagnostics import cross_validation
from kpi_forecasting.configs.model_inputs import (
ProphetHoliday,
ProphetRegressor,
holiday_collection,
regressor_collection,
)
from kpi_forecasting.models.prophet_forecast import ProphetForecast
@dataclass
class SegmentModelSettings:
"""
Holds the configuration and results for each segment
in a funnel forecasting model.
"""
segment: Dict[str, str]
start_date: str
end_date: str
grid_parameters: Dict[str, Union[List[float], float]]
cv_settings: Dict[str, str]
holidays: list = field(default_factory=list[ProphetHoliday])
regressors: list = field(default_factory=list[ProphetRegressor])
# Hold results as models are trained and forecasts made
segment_model: prophet.Prophet = None
trained_parameters: dict = field(default_factory=dict[str, str])
forecast_df: pd.DataFrame = None
components_df: pd.DataFrame = None
@dataclass
class FunnelForecast(ProphetForecast):
"""
FunnelForecast class for generating and managing forecast models. The class handles
cases where forecasts for a combination of dimensions are required for a metric.
Inherits from BaseForecast and provides methods for initializing forecast
parameters, building models, generating forecasts, summarizing results,
and writing results to BigQuery.
"""
def __post_init__(self) -> None:
"""
Post-initialization method to set up necessary attributes and configurations.
This method sets up the dates to predict, constructs segment combinations,
initializes models for each segment, and prepares attributes for storing results.
"""
super().__post_init__()
if self.metric_hub is None:
# this is used to avoid the code below for testing purposes
return
self._set_segment_models(self.observed_df, self.metric_hub.segments.keys())
# initialize unset attributes
self.components_df = None
def _set_segment_models(
self, observed_df: pd.DataFrame, segment_column_list: list
) -> None:
"""Creates a SegmentSettings object for each segment specified in the
metric_hub.segments section of the config. It is populated from the list of
parameters in the forecast_model.parameters section of the configuration file.
The segements section of each element of the list specifies which values within which
segments the parameters are associated with.
Args:
observed_df (pd.DataFrame): dataframe containing observed data used to model
must contain columns specified in the keys of the segments section of the config
segment_column_list (list): list of columns of observed_df to use to determine segments
"""
# Construct a DataFrame containing all combination of segment values
## in the observed_df
combination_df = observed_df[segment_column_list].drop_duplicates()
# Construct dictionaries from those combinations
# this will be used to check that the config actually partitions the data
segment_combinations = combination_df.to_dict("records")
# get subset of segment that is used in partitioning
split_dims = None
for partition in self.parameters:
partition_dim = set(partition["segment"].keys())
if split_dims and partition_dim != split_dims:
raise ValueError(
"Segment keys are not the same across different elements of parameters in the config file"
)
elif split_dims is None:
split_dims = partition_dim
else:
# this is case where split_dim is set and matches paritition_dim
continue
if not split_dims <= set(combination_df.keys()):
missing_dims = split_dims - set(combination_df.keys())
missing_dims_str = ",".join(missing_dims)
raise ValueError(
f"Segment keys missing from metric hub segments: {missing_dims_str}"
)
# For each segment combinination, get the model parameters from the config
## file. Parse the holidays and regressors specified in the config file.
segment_models = []
for segment in segment_combinations:
# find the correct configuration
for partition in self.parameters:
partition_segment = partition["segment"]
# get subset of segment that is used to partition
subset_segment = {
key: val for key, val in segment.items() if key in split_dims
}
if partition_segment == subset_segment:
# parition is set to the desired value
# break out of loop
break
holiday_list = []
regressor_list = []
if "holidays" in partition:
holiday_list = [holiday_collection[h] for h in partition["holidays"]]
if "regressors" in partition:
regressor_list = [
regressor_collection[r] for r in partition["regressors"]
]
# Create a SegmentModelSettings object for each segment combination
segment_models.append(
SegmentModelSettings(
segment=segment,
start_date=partition["start_date"],
end_date=self.end_date,
holidays=[ProphetHoliday(**h) for h in holiday_list],
regressors=[ProphetRegressor(**r) for r in regressor_list],
grid_parameters=dict(partition["grid_parameters"]),
cv_settings=dict(partition["cv_settings"]),
)
)
self.segment_models = segment_models
@property
def column_names_map(self) -> Dict[str, str]:
"""
Map column names from the dataset to the names required by Prophet.
Returns:
Dict[str, str]: Mapping of column names.
"""
return {"submission_date": "ds", "value": "y"}
def _fill_regressor_dates(self, regressor: ProphetRegressor) -> ProphetRegressor:
"""
Fill missing start and end dates for a regressor. A ProphetRegressor can be created
without a 'start_date' or 'end_date' being supplied, so this checks for either date attr
being missing and fills in with the appropriate date: if 'start_date' is missing, it assumes
that the regressor starts at the beginning of the observed data; if 'end_date' is missing,
it assumes that the regressor should be filled until the end of the forecast period.
Args:
regressor (ProphetRegressor): The regressor to fill dates for.
Returns:
ProphetRegressor: The regressor with filled dates.
"""
for date in ["start_date", "end_date"]:
if getattr(regressor, date) is None:
setattr(regressor, date, getattr(self, date))
elif isinstance(getattr(regressor, date), str):
setattr(regressor, date, pd.to_datetime(getattr(regressor, date)))
if regressor.end_date < regressor.start_date:
raise Exception(
f"Regressor {regressor.name} start date comes after end date"
)
return regressor
def _build_model(
self,
segment_settings: SegmentModelSettings,
parameters: Dict[str, Union[float, str, bool]],
) -> prophet.Prophet:
"""
Build a Prophet model from parameters.
Args:
segment_settings (SegmentModelSettings): The settings for the segment.
parameters (Dict[str, Union[float, str, bool]]): The parameters for the model.
Returns:
prophet.Prophet: The Prophet model.
"""
if segment_settings.holidays:
parameters["holidays"] = pd.concat(
[
pd.DataFrame(
{
"holiday": h.name,
"ds": pd.to_datetime(h.ds),
"lower_window": h.lower_window,
"upper_window": h.upper_window,
}
)
for h in segment_settings.holidays
],
ignore_index=True,
)
m = prophet.Prophet(
**parameters,
uncertainty_samples=self.number_of_simulations,
mcmc_samples=0,
)
for regressor in segment_settings.regressors:
m.add_regressor(
regressor.name,
prior_scale=regressor.prior_scale,
mode=regressor.mode,
)
return m
def _build_train_dataframe(
self,
observed_df,
segment_settings: SegmentModelSettings,
add_logistic_growth_cols: bool = False,
) -> pd.DataFrame:
"""
Build the model dataframe for training
Args:
observed_df: dataframe of observed data
segment_settings (SegmentModelSettings): The settings for the segment.
add_logistic_growth_cols (bool, optional): Whether to add logistic growth columns. Defaults to False.
Returns:
pd.DataFrame: The dataframe for the model.
"""
# find indices in observed_df for rows that exactly match segment dict
segment_historical_indices = (
observed_df[list(segment_settings.segment)]
== pd.Series(segment_settings.segment)
).all(axis=1)
df = (
observed_df.loc[
(segment_historical_indices)
& ( # filter observed_df if segment start date > metric_hub start date
observed_df["submission_date"]
>= datetime.strptime(segment_settings.start_date, "%Y-%m-%d").date()
)
]
.rename(columns=self.column_names_map)
.copy()
)
# define limits for logistic growth
if add_logistic_growth_cols:
df["floor"] = df["y"].min() * 0.5
df["cap"] = df["y"].max() * 1.5
if segment_settings.regressors:
df = self._add_regressors(df, segment_settings.regressors)
return df
def _build_predict_dataframe(
self,
dates_to_predict: pd.DataFrame,
segment_settings: SegmentModelSettings,
add_logistic_growth_cols: bool = False,
) -> pd.DataFrame:
"""creates dataframe used for prediction
Args:
dates_to_predict (pd.DataFrame): dataframe of dates to predict
segment_settings (SegmentModelSettings): settings related to the segment
add_logistic_growth_cols (bool): Whether to add logistic growth columns. Defaults to False.
Returns:
pd.DataFrame: dataframe to use used in prediction
"""
# predict dataframe only needs dates to predict, logistic growth limits, and regressors
df = dates_to_predict.rename(columns=self.column_names_map).copy()
if add_logistic_growth_cols:
df["floor"] = segment_settings.trained_parameters["floor"]
df["cap"] = segment_settings.trained_parameters["cap"]
if segment_settings.regressors:
df = self._add_regressors(df, segment_settings.regressors)
return df
def _fit(self, observed_df: pd.DataFrame) -> None:
"""
Fit and save a Prophet model for each segment combination.
Args:
observed_df (pd.DataFrame): dataframe of observations. Expected to have columns
specified in the segments section of the config,
submission_date column with unique dates corresponding to each observation and
y column containing values of observations
"""
for segment_settings in self.segment_models:
parameters = self._auto_tuning(observed_df, segment_settings)
# Initialize model; build model dataframe
add_log_growth_cols = (
"growth" in parameters.keys() and parameters["growth"] == "logistic"
)
test_dat = self._build_train_dataframe(
observed_df, segment_settings, add_log_growth_cols
)
model = self._build_model(segment_settings, parameters)
model.fit(test_dat)
if add_log_growth_cols:
# all values in these colunns are the same
parameters["floor"] = test_dat["floor"].values[0]
parameters["cap"] = test_dat["cap"].values[0]
if "holidays" in parameters.keys():
parameters["holidays"] = (
parameters["holidays"]["holiday"].unique().tolist()
)
segment_settings.trained_parameters = parameters
segment_settings.segment_model = model
def _get_crossvalidation_metric(
self, m: prophet.Prophet, cv_settings: dict
) -> float:
"""function for calculated the metric used for crossvalidation
Args:
m (prophet.Prophet): Prophet model for crossvalidation
cv_settings (dict): settings set by segment in the config file
Returns:
float: Metric where closer to zero means a better model
"""
df_cv = cross_validation(m, **cv_settings)
df_bias = df_cv.groupby("cutoff")[["yhat", "y"]].sum().reset_index()
df_bias["pcnt_bias"] = df_bias["yhat"] / df_bias["y"] - 1
# Prophet splits the historical data when doing cross validation using
# cutoffs. The `.tail(3)` limits the periods we consider for the best
# parameters to the 3 most recent cutoff periods.
return df_bias.tail(3)["pcnt_bias"].mean()
def _auto_tuning(
self, observed_df, segment_settings: SegmentModelSettings
) -> Dict[str, float]:
"""
Perform automatic tuning of model parameters.
Args:
observed_df (pd.DataFrame): dataframe of observed data
Expected to have columns:
specified in the segments section of the config,
submission_date column with unique dates corresponding to each observation and
y column containing values of observations
segment_settings (SegmentModelSettings): The settings for the segment.
Returns:
Dict[str, float]: The tuned parameters.
"""
add_log_growth_cols = (
"growth" in segment_settings.grid_parameters.keys()
and segment_settings.grid_parameters["growth"] == "logistic"
)
for k, v in segment_settings.grid_parameters.items():
if not isinstance(v, list):
segment_settings.grid_parameters[k] = [v]
param_grid = [
dict(zip(segment_settings.grid_parameters.keys(), v))
for v in itertools.product(*segment_settings.grid_parameters.values())
]
test_dat = self._build_train_dataframe(
observed_df, segment_settings, add_log_growth_cols
)
bias = []
for params in param_grid:
m = self._build_model(segment_settings, params)
m.fit(test_dat)
crossval_metric = self._get_crossvalidation_metric(
m, segment_settings.cv_settings
)
bias.append(crossval_metric)
min_abs_bias_index = np.argmin(np.abs(bias))
return param_grid[min_abs_bias_index]
def _add_regressors(self, df: pd.DataFrame, regressors: List[ProphetRegressor]):
"""
Add regressor columns to the dataframe for training or prediction.
Args:
df (pd.DataFrame): The input dataframe.
regressors (List[ProphetRegressor]): The list of regressors to add.
Returns:
pd.DataFrame: The dataframe with regressors added.
"""
for regressor in regressors:
regressor = self._fill_regressor_dates(regressor)
# finds rows where date is in regressor date ranges and sets that regressor
## value to 0, else 1
df[regressor.name] = (
~(
(df["ds"] >= pd.to_datetime(regressor.start_date).date())
& (df["ds"] <= pd.to_datetime(regressor.end_date).date())
)
).astype(int)
return df
def _predict(
self, dates_to_predict_raw: pd.DataFrame, segment_settings: SegmentModelSettings
) -> pd.DataFrame:
"""
Generate forecast samples for a segment.
Args:
dates_to_predict (pd.DataFrame): dataframe of dates to predict
segment_settings (SegmentModelSettings): The settings for the segment.
Returns:
pd.DataFrame: The forecasted values.
"""
add_log_growth_cols = (
"growth" in segment_settings.trained_parameters.keys()
and segment_settings.trained_parameters["growth"] == "logistic"
)
# add regressors, logistic growth limits (if applicable) to predict dataframe
dates_to_predict = self._build_predict_dataframe(
dates_to_predict_raw, segment_settings, add_log_growth_cols
)
# draws samples from Prophet posterior distribution, to provide percentile predictions
samples = segment_settings.segment_model.predictive_samples(dates_to_predict)
df = pd.DataFrame(samples["yhat"])
df["submission_date"] = dates_to_predict_raw
component_cols = [
"ds",
"yhat",
"trend",
"trend_upper",
"trend_lower",
"weekly",
"weekly_upper",
"weekly_lower",
"yearly",
"yearly_upper",
"yearly_lower",
]
# use 'predict' method to return components from the Prophet model
components_df = segment_settings.segment_model.predict(dates_to_predict)[
component_cols
]
# join observed data to components df, which allows for calc of intra-sample
# error rates and how components resulted in those predictions. The `fillna`
# call will fill the missing y values for forecasted dates, where only yhat
# is available.
components_df = components_df.merge(
segment_settings.segment_model.history[["ds", "y"]],
on="ds",
how="left",
).fillna(0)
components_df.rename(columns={"ds": "submission_date"}, inplace=True)
segment_settings.components_df = components_df.copy()
return df
def _validate_forecast_df(self, df: pd.DataFrame) -> None:
"""
Validate that the forecast dataframe has been generated correctly for each segment.
Args:
df (pd.DataFrame): The forecast dataframe.
Raises:
ValueError: If the dataframe does not meet the required conditions.
"""
columns = df.columns
numeric_columns = df.drop(columns=["submission_date"]).columns
if "submission_date" not in columns:
raise ValueError("forecast_df must contain a 'submission_date' column.")
for i in numeric_columns:
if not pd_types.is_numeric_dtype(df[i]):
raise ValueError(
"All forecast_df columns except 'submission_date' and segment dims must be numeric,"
f" but column {i} has type {df[i].dtypes}."
)
def _percentile_name_map(self, percentiles: List[int]) -> Dict[str, str]:
"""
Map percentiles to their corresponding names for the BQ table.
Args:
percentiles (List[int]): The list of percentiles.
Returns:
Dict[str, str]: The mapping of percentile names.
"""
percentiles.sort()
return {
f"p{percentiles[0]}": "value_low",
f"p{percentiles[1]}": "value_mid",
f"p{percentiles[2]}": "value_high",
"mean": "value",
}
def _combine_forecast_observed(
self,
forecast_df: pd.DataFrame,
observed_df: pd.DataFrame,
period: str,
numpy_aggregations: List,
percentiles,
segment: dict,
) -> pd.DataFrame:
"""Calculate aggregates over the forecast and observed data
and concatenate the two dataframes
Args:
forecast_df (pd.DataFrame): forecast dataframe
observed_df (pd.DataFrame): observed dataframe
period (str): period to aggregate up to, must be in (day, month, year)
numpy_aggregations (List): List of aggregation functions to apply across samples from the
posterior-predictive distribution. Must take
in a numpy array and return a single value
percentiles: 3-element list of percentiles to calculate across samples from the posterior-predictive distribution
segment (dict): dictionary that lists columns and values corresponding to the segment
keys are the column name used to segment and values are the values
of that column corresponding to the current segment
Returns:
pd.DataFrame: combined dataframe containing aggregated values from observed and forecast
"""
# filter the forecast data to just the data in the future
last_historic_date = observed_df["submission_date"].max()
forecast_df = forecast_df.loc[
forecast_df["submission_date"] > last_historic_date
]
forecast_summarized, observed_summarized = self._aggregate_forecast_observed(
forecast_df, observed_df, period, numpy_aggregations, percentiles
)
# add datasource-specific metadata columns
forecast_summarized["source"] = "forecast"
observed_summarized["source"] = "historical"
# add segment columns to forecast table
for dim, value in segment.items():
forecast_summarized[dim] = value
# rename forecast percentile to low, middle, high
# rename mean to value
forecast_summarized = forecast_summarized.rename(
columns=self._percentile_name_map(percentiles)
)
# create a single dataframe that contains observed and forecasted data
df = pd.concat([observed_summarized, forecast_summarized])
return df
def _summarize(
self,
segment_settings: SegmentModelSettings,
period: str,
numpy_aggregations: List[str],
percentiles: List[int] = [10, 50, 90],
) -> pd.DataFrame:
"""
Calculate summary metrics on a specific segment
for `forecast_df` over a given period, and add metadata.
Args:
segment_settings (SegmentModelSettings): The settings for the segment.
period (str): The period for aggregation.
numpy_aggregations (List[str]): List of numpy aggregation functions.
percentiles (List[int]): List of percentiles.
Returns:
pd.DataFrame: The summarized dataframe.
"""
if len(percentiles) != 3:
raise ValueError(
"""
Can only pass a list of length 3 as percentiles, for lower, mid, and upper values.
"""
)
# the start date for this segment's historical data, in cases where the full time series
## of historical data is not used for model training
segment_observed_start_date = datetime.strptime(
segment_settings.start_date, "%Y-%m-%d"
).date()
# find indices in observed_df for rows that exactly match segment dict
segment_historical_indices = (
self.observed_df[list(segment_settings.segment)]
== pd.Series(segment_settings.segment)
).all(axis=1)
segment_observed_df = self.observed_df.loc[
(segment_historical_indices)
& (self.observed_df["submission_date"] >= segment_observed_start_date)
].copy()
df = self._combine_forecast_observed(
segment_settings.forecast_df,
segment_observed_df,
period,
numpy_aggregations,
percentiles,
segment_settings.segment,
)
df["forecast_parameters"] = json.dumps(segment_settings.trained_parameters)
# add summary metadata columns
df["aggregation_period"] = period.lower()
return df
def predict(self) -> None:
"""Generate a forecast from `start_date` to `end_date`."""
print(f"Forecasting from {self.start_date} to {self.end_date}.", flush=True)
self._set_seed()
self.predicted_at = datetime.utcnow()
for segment_settings in self.segment_models:
forecast_df = self._predict(self.dates_to_predict, segment_settings)
self._validate_forecast_df(forecast_df)
segment_settings.forecast_df = forecast_df
def summarize(
self,
periods: List[str] = ["day", "month"],
numpy_aggregations: List[str] = ["mean"],
percentiles: List[int] = [10, 50, 90],
) -> None:
"""
Summarize the forecast results over specified periods.
Args:
periods (List[str], optional): The periods for summarization. Defaults to ["day", "month"].
numpy_aggregations (List[str], optional): The numpy aggregation functions. Defaults to ["mean"].
percentiles (List[int], optional): The percentiles for summarization. Defaults to [10, 50, 90].
"""
summary_df_list = []
components_df_list = []
for segment in self.segment_models:
summary_df = pd.concat(
[
self._summarize(
segment,
i,
numpy_aggregations,
percentiles,
)
for i in periods
]
)
for dim, dim_value in segment.segment.items():
segment.components_df[dim] = dim_value
summary_df_list.append(summary_df.copy(deep=True))
components_df_list.append(segment.components_df)
del summary_df
df = pd.concat(summary_df_list, ignore_index=True)
# add Metric Hub metadata columns
df["metric_alias"] = self.metric_hub.alias.lower()
df["metric_hub_app_name"] = self.metric_hub.app_name.lower()
df["metric_hub_slug"] = self.metric_hub.slug.lower()
df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date)
df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date)
df["metric_collected_at"] = self.collected_at
# add forecast model metadata columns
df["forecast_start_date"] = self.start_date
df["forecast_end_date"] = self.end_date
df["forecast_trained_at"] = self.trained_at
df["forecast_predicted_at"] = self.predicted_at
self.summary_df = df
self.components_df = pd.concat(components_df_list, ignore_index=True)
def write_results(
self,
project: str,
dataset: str,
table: str,
write_disposition: str = "WRITE_APPEND",
components_table: str = "",
components_dataset: str = "",
) -> None:
"""
Write `self.summary_df` to Big Query.
Args:
project (str): The Big Query project that the data should be written to.
dataset (str): The Big Query dataset that the data should be written to.
table (str): The Big Query table that the data should be written to.
write_disposition (str, optional): In the event that the destination table exists,
should the table be overwritten ("WRITE_TRUNCATE") or appended to ("WRITE_APPEND")? Defaults to "WRITE_APPEND".
components_table (str, optional): The Big Query table for model components. Defaults to "".
components_dataset (str, optional): The Big Query dataset for model components. Defaults to "".
"""
print(
f"Writing results to `{project}.{dataset}.{table}`.",
flush=True,
)
client = bigquery.Client(project=project)
schema = [
bigquery.SchemaField("submission_date", bq_types.DATE),
*[
bigquery.SchemaField(k, bq_types.STRING)
for k in self.metric_hub.segments.keys()
],
bigquery.SchemaField("aggregation_period", bq_types.STRING),
bigquery.SchemaField("source", bq_types.STRING),
bigquery.SchemaField("value", bq_types.FLOAT),
bigquery.SchemaField("value_low", bq_types.FLOAT),
bigquery.SchemaField("value_mid", bq_types.FLOAT),
bigquery.SchemaField("value_high", bq_types.FLOAT),
bigquery.SchemaField("metric_alias", bq_types.STRING),
bigquery.SchemaField("metric_hub_app_name", bq_types.STRING),
bigquery.SchemaField("metric_hub_slug", bq_types.STRING),
bigquery.SchemaField("metric_start_date", bq_types.DATE),
bigquery.SchemaField("metric_end_date", bq_types.DATE),
bigquery.SchemaField("metric_collected_at", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_start_date", bq_types.DATE),
bigquery.SchemaField("forecast_end_date", bq_types.DATE),
bigquery.SchemaField("forecast_trained_at", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_predicted_at", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_parameters", bq_types.STRING),
]
job = client.load_table_from_dataframe(
dataframe=self.summary_df,
destination=f"{project}.{dataset}.{table}",
job_config=bigquery.LoadJobConfig(
schema=schema,
autodetect=False,
write_disposition=write_disposition,
),
)
# Wait for the job to complete.
job.result()
if components_table:
numeric_cols = list(self.components_df.select_dtypes(include=float).columns)
string_cols = list(self.components_df.select_dtypes(include=object).columns)
self.components_df["metric_slug"] = self.metric_hub.slug
self.components_df["forecast_trained_at"] = self.trained_at
schema = [
bigquery.SchemaField("submission_date", bq_types.DATE),
bigquery.SchemaField("metric_slug", bq_types.STRING),
bigquery.SchemaField("forecast_trained_at", bq_types.TIMESTAMP),
]
schema += [
bigquery.SchemaField(col, bq_types.STRING) for col in string_cols
]
schema += [
bigquery.SchemaField(col, bq_types.FLOAT) for col in numeric_cols
]
if not components_dataset:
components_dataset = dataset
print(
f"Writing model components to `{project}.{components_dataset}.{components_table}`.",
flush=True,
)
job = client.load_table_from_dataframe(
dataframe=self.components_df,
destination=f"{project}.{components_dataset}.{components_table}",
job_config=bigquery.LoadJobConfig(
schema=schema,
autodetect=False,
write_disposition=write_disposition,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
],
),
)
job.result()