jobs/kpi-forecasting/kpi_forecasting/models/prophet_forecast.py (313 lines of code) (raw):
import json
import pandas as pd
from pandas.api import types as pd_types
import prophet
import numpy as np
from typing import Dict, List
from datetime import datetime, timezone
from dataclasses import dataclass
from kpi_forecasting.models.base_forecast import BaseForecast
from kpi_forecasting import pandas_extras as pdx
from google.cloud import bigquery
from google.cloud.bigquery.enums import SqlTypeNames as bq_types
@dataclass
class ProphetForecast(BaseForecast):
"""Forecast object specifically for prophet forecast models
Additional attributes:
number_of_simulations (int): The number of simulated timeseries that the forecast
should generate. Since many forecast models are probablistic, this enables the
measurement of variation across a range of possible outcomes.
"""
number_of_simulations: int = 1000
@property
def column_names_map(self) -> Dict[str, str]:
return {"submission_date": "ds", "value": "y"}
def _build_model(self, parameter_dict):
model = prophet.Prophet(
**parameter_dict,
uncertainty_samples=self.number_of_simulations,
mcmc_samples=0,
)
if self.use_all_us_holidays:
model.add_country_holidays(country_name="US")
return model
def _fit(self, observed_df) -> None:
self.model = self._build_model(self.parameters)
# Modify observed data to have column names that Prophet expects, and fit
# the model
self.model.fit(observed_df.rename(columns=self.column_names_map))
def _predict(self, dates_to_predict) -> pd.DataFrame:
# generate the forecast samples
samples = self.model.predictive_samples(
dates_to_predict.rename(columns=self.column_names_map)
)
df = pd.DataFrame(samples["yhat"])
df["submission_date"] = dates_to_predict
return df
def _validate_forecast_df(self, df) -> None:
"""Validate that `self.forecast_df` has been generated correctly."""
columns = df.columns
expected_shape = (len(self.dates_to_predict), 1 + self.number_of_simulations)
numeric_columns = df.drop(columns="submission_date").columns
if "submission_date" not in columns:
raise ValueError("forecast_df must contain a 'submission_date' column.")
if df.shape != expected_shape:
raise ValueError(
f"Expected forecast_df to have shape {expected_shape}, but it has shape {df.shape}."
)
if not df["submission_date"].equals(self.dates_to_predict["submission_date"]):
raise ValueError(
"forecast_df['submission_date'] does not match dates_to_predict['submission_date']."
)
for i in numeric_columns:
if not pd_types.is_numeric_dtype(self.forecast_df[i]):
raise ValueError(
"All forecast_df columns except 'submission_date' must be numeric,"
f" but column {i} has type {df[i].dtypes}."
)
def _predict_legacy(self) -> pd.DataFrame:
"""
Recreate the legacy format used in
`moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_v1`.
"""
# TODO: This method should be removed once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676
df = self.model.predict(
self.dates_to_predict.rename(columns=self.column_names_map)
)
# set legacy column values
if "dau" in self.metric_hub.alias.lower():
df["metric"] = "DAU"
else:
df["metric"] = self.metric_hub.alias
df["forecast_date"] = str(
datetime.now(timezone.utc).replace(tzinfo=None).date()
)
df["forecast_parameters"] = str(
json.dumps({**self.parameters, "holidays": self.use_all_us_holidays})
)
alias = self.metric_hub.alias.lower()
if ("desktop" in alias) and ("mobile" in alias):
raise ValueError(
"Metric Hub alias must include either 'desktop' or 'mobile', not both."
)
elif "desktop" in alias:
df["target"] = "desktop"
elif "mobile" in alias:
df["target"] = "mobile"
else:
raise ValueError(
"Metric Hub alias must include either 'desktop' or 'mobile'."
)
columns = [
"ds",
"trend",
"yhat_lower",
"yhat_upper",
"trend_lower",
"trend_upper",
"additive_terms",
"additive_terms_lower",
"additive_terms_upper",
"extra_regressors_additive",
"extra_regressors_additive_lower",
"extra_regressors_additive_upper",
"holidays",
"holidays_lower",
"holidays_upper",
"regressor_00",
"regressor_00_lower",
"regressor_00_upper",
"weekly",
"weekly_lower",
"weekly_upper",
"yearly",
"yearly_lower",
"yearly_upper",
"multiplicative_terms",
"multiplicative_terms_lower",
"multiplicative_terms_upper",
"yhat",
"target",
"forecast_date",
"forecast_parameters",
"metric",
]
for column in columns:
if column not in df.columns:
df[column] = 0.0
return df[columns]
def _aggregate_forecast_observed(
self,
forecast_df,
observed_df,
period: str,
numpy_aggregations: List[str],
percentiles: List[int],
):
# build a list of all functions that we'll summarize the data by
aggregations = [getattr(np, i) for i in numpy_aggregations]
aggregations.extend([pdx.percentile(i) for i in percentiles])
# aggregate metric to the correct date period (day, month, year)
observed_summarized = pdx.aggregate_to_period(observed_df, period)
forecast_agg = pdx.aggregate_to_period(forecast_df, period).sort_values(
"submission_date"
)
# find periods of overlap between observed and forecasted data
# merge preserves key order so overlap will be sorted by submission_date
overlap = forecast_agg.merge(
observed_summarized,
on="submission_date",
how="left",
).fillna(0)
forecast_summarized = (
forecast_agg.set_index("submission_date")
# Add observed data samples to any overlapping forecasted period. This
# ensures that any forecast made partway through a period accounts for
# previously observed data within the period. For example, when a monthly
# forecast is generated in the middle of the month.
.add(overlap[["value"]].values)
# calculate summary values, aggregating by submission_date,
.agg(aggregations, axis=1)
.reset_index()
)
return forecast_summarized, observed_summarized
def _combine_forecast_observed(
self,
forecast_df,
observed_df,
period: str,
numpy_aggregations: List[str],
percentiles: List[int],
):
forecast_summarized, observed_summarized = self._aggregate_forecast_observed(
forecast_df, observed_df, period, numpy_aggregations, percentiles
)
# remaining column of metric values get the column name 'value'
forecast_summarized = forecast_summarized.melt(
id_vars="submission_date", var_name="measure"
)
observed_summarized["measure"] = "observed"
# add datasource-specific metadata columns
forecast_summarized["source"] = "forecast"
observed_summarized["source"] = "historical"
df = pd.concat([forecast_summarized, observed_summarized])
return df
def _summarize(
self,
forecast_df,
observed_df,
period: str,
numpy_aggregations: List[str],
percentiles: List[int],
) -> pd.DataFrame:
"""
Calculate summary metrics for `self.forecast_df` over a given period, and
add metadata.
"""
df = self._combine_forecast_observed(
forecast_df, observed_df, period, numpy_aggregations, percentiles
)
# add summary metadata columns
df["aggregation_period"] = period.lower()
return df
def _summarize_legacy(self) -> pd.DataFrame:
"""
Converts a `self.summary_df` to the legacy format used in
`moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_confidences_v1`
"""
# TODO: This method should be removed once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676
df = self.summary_df.copy(deep=True)
# rename columns to legacy values
df.rename(
columns={
"forecast_end_date": "asofdate",
"submission_date": "date",
"metric_alias": "target",
"aggregation_period": "unit",
},
inplace=True,
)
df["forecast_date"] = df["forecast_predicted_at"].dt.date
df["type"] = df["source"].replace("historical", "actual")
df = df.replace(
{
"measure": {
"observed": "value",
"p05": "yhat_p5",
"p10": "yhat_p10",
"p20": "yhat_p20",
"p30": "yhat_p30",
"p40": "yhat_p40",
"p50": "yhat_p50",
"p60": "yhat_p60",
"p70": "yhat_p70",
"p80": "yhat_p80",
"p90": "yhat_p90",
"p95": "yhat_p95",
},
"target": {
"desktop_dau": "desktop",
"mobile_dau": "mobile",
},
}
)
# pivot the df from "long" to "wide" format
index_columns = [
"asofdate",
"date",
"target",
"unit",
"forecast_parameters",
"forecast_date",
]
df = (
df[index_columns + ["measure", "value"]]
.pivot(
index=index_columns,
columns="measure",
values="value",
)
.reset_index()
)
# pivot sets the "name" attribute of the columns for some reason. It's
# None by default, so we just reset that here.
df.columns.name = None
# When there's an overlap in the observed and forecasted period -- for
# example, when a monthly forecast is generated mid-month -- the legacy
# format only records the forecasted value, not the observed value. To
# account for this, we'll just find the max of the "mean" (forecasted) and
# "value" (observed) data. In all non-overlapping observed periods, the
# forecasted value will be NULL. In all non-overlapping forecasted periods,
# the observed value will be NULL. In overlapping periods, the forecasted
# value will always be larger because it is the sum of the observed and forecasted
# values. Below is a query that demonstrates the legacy behavior:
#
# SELECT *
# FROM `moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_confidences_v1`
# WHERE asofdate = "2023-12-31"
# AND target = "mobile"
# AND unit = "month"
# AND forecast_date = "2022-06-04"
# AND date BETWEEN "2022-05-01" AND "2022-06-01"
# ORDER BY date
df["value"] = df[["mean", "value"]].max(axis=1)
df.drop(columns=["mean"], inplace=True)
# non-numeric columns are represented in the legacy bq schema as strings
string_cols = [
"asofdate",
"date",
"target",
"unit",
"forecast_parameters",
"forecast_date",
]
df[string_cols] = df[string_cols].astype(str)
return df
def write_results(
self,
project: str,
dataset: str,
table: str,
project_legacy: str,
dataset_legacy: str,
write_disposition: str = "WRITE_APPEND",
forecast_table_legacy: str = "kpi_automated_forecast_v1",
confidences_table_legacy: str = "kpi_automated_forecast_confidences_v1",
) -> None:
"""
Write `self.summary_df` to Big Query.
Args:
project (str): The Big Query project that the data should be written to.
dataset (str): The Big Query dataset that the data should be written to.
table (str): The Big Query table that the data should be written to.
write_disposition (str): In the event that the destination table exists,
should the table be overwritten ("WRITE_TRUNCATE") or appended to
("WRITE_APPEND")?
"""
# get legacy tables
# TODO: remove this once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676
self.forecast_df_legacy = self._predict_legacy()
self.summary_df_legacy = self._summarize_legacy()
print(f"Writing results to `{project}.{dataset}.{table}`.", flush=True)
client = bigquery.Client(project=project)
schema = [
bigquery.SchemaField("submission_date", bq_types.DATE),
bigquery.SchemaField("aggregation_period", bq_types.STRING),
bigquery.SchemaField("source", bq_types.STRING),
bigquery.SchemaField("measure", bq_types.STRING),
bigquery.SchemaField("value", bq_types.FLOAT),
bigquery.SchemaField("metric_alias", bq_types.STRING),
bigquery.SchemaField("metric_hub_app_name", bq_types.STRING),
bigquery.SchemaField("metric_hub_slug", bq_types.STRING),
bigquery.SchemaField("metric_start_date", bq_types.DATE),
bigquery.SchemaField("metric_end_date", bq_types.DATE),
bigquery.SchemaField("metric_collected_at", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_start_date", bq_types.DATE),
bigquery.SchemaField("forecast_end_date", bq_types.DATE),
bigquery.SchemaField("forecast_trained_at", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_predicted_at", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_parameters", bq_types.STRING),
]
job = client.load_table_from_dataframe(
dataframe=self.summary_df,
destination=f"{project}.{dataset}.{table}",
job_config=bigquery.LoadJobConfig(
schema=schema,
autodetect=False,
write_disposition=write_disposition,
),
)
# Wait for the job to complete.
job.result()
# TODO: remove the below jobs once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676
job = client.load_table_from_dataframe(
dataframe=self.forecast_df_legacy,
destination=f"{project_legacy}.{dataset_legacy}.{forecast_table_legacy}",
job_config=bigquery.LoadJobConfig(
write_disposition=write_disposition,
schema=[
bigquery.SchemaField("ds", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_date", bq_types.STRING),
bigquery.SchemaField("forecast_parameters", bq_types.STRING),
],
),
)
job.result()
job = client.load_table_from_dataframe(
dataframe=self.summary_df_legacy,
destination=f"{project_legacy}.{dataset_legacy}.{confidences_table_legacy}",
job_config=bigquery.LoadJobConfig(
write_disposition=write_disposition,
schema=[
bigquery.SchemaField("asofdate", bq_types.STRING),
bigquery.SchemaField("date", bq_types.STRING),
],
),
)
job.result()