in jobs/kpi-forecasting/kpi_forecasting/results_processing.py [0:0]
def _get_most_recent_forecasts(self, month_level_df: pd.DataFrame) -> pd.DataFrame:
"""Adds the following columns to month_level_df:
- previous_forecast_month (timestamp):
Timestamp of the first day of the month corresponding to the current forecast
- forecast_value_previous_month (float): forecast value for the previous month
Args:
month_level_df (pd.DataFrame): Dataframe to process. Must have the following columns
in addition to those listed in self.identifier_columns:
- forecast_trained_at_month
- forecast_value
Returns:
pd.DataFrame: DataFrame with new columns added. Has the same number of rows as input
"""
current_forecast_month_df = (
month_level_df[self.identifier_columns + ["forecast_trained_at_month"]]
.groupby(self.identifier_columns)
.agg(current_forecast_month=("forecast_trained_at_month", "max"))
.reset_index()
)
month_level_df = month_level_df.merge(
current_forecast_month_df, on=self.identifier_columns
)
exclude_current_forecast_month = month_level_df[
month_level_df["forecast_trained_at_month"]
!= month_level_df["current_forecast_month"]
]
previous_forecast_month_df = (
exclude_current_forecast_month[
self.identifier_columns + ["forecast_trained_at_month"]
]
.groupby(self.identifier_columns)
.agg(previous_forecast_month=("forecast_trained_at_month", "max"))
.reset_index()
)
month_level_df = month_level_df.merge(
previous_forecast_month_df, on=self.identifier_columns
)
month_level_df = month_level_df.merge(
month_level_df[
self.identifier_columns
+ ["forecast_trained_at_month", "forecast_value"]
],
left_on=self.identifier_columns + ["previous_forecast_month"],
right_on=self.identifier_columns + ["forecast_trained_at_month"],
suffixes=(None, "_previous_month"),
).drop(columns="forecast_trained_at_month_previous_month")
return month_level_df