orbit/forecaster/map.py (110 lines of code) (raw):
import numpy as np
import pandas as pd
from functools import partial
from ..constants.constants import PredictMethod, PredictionKeys
from ..exceptions import ForecasterException
from ..utils.predictions import prepend_date_column, compute_percentiles
from .forecaster import Forecaster
class MAPForecaster(Forecaster):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._point_posteriors[PredictMethod.MAP.value] = dict()
self._point_method = PredictMethod.MAP.value
def set_forecaster_training_meta(self, data_input):
# MCMC flag to be true
data_input.update({"WITH_MCMC": 0})
return data_input
def fit(self, df, **kwargs):
super().fit(df, **kwargs)
posterior_samples = self._posterior_samples
map_posterior = {}
for param_name in self._model.get_model_param_names():
param_array = posterior_samples[param_name]
# add one dimension as batch to have consistent logic with `.predict()`
param_array = np.expand_dims(param_array, axis=0)
map_posterior.update({param_name: param_array})
self._point_posteriors[PredictMethod.MAP.value] = map_posterior
# TODO: right now this is hacky:
# need to do it one more time to over-write the extra methods with right posterior
self.load_extra_methods()
return self
def predict(
self, df, decompose=False, store_prediction_array=False, seed=None, **kwargs
) -> pd.DataFrame:
# raise if model is not fitted
if not self.is_fitted():
raise ForecasterException("Model is not fitted yet.")
# obtain basic meta data from input df
self._set_prediction_meta(df)
prediction_meta = self.get_prediction_meta()
training_meta = self.get_training_meta()
if seed is not None:
np.random.seed(seed)
# perform point prediction
point_posteriors = self._point_posteriors.get(PredictMethod.MAP.value)
point_predicted_dict = self._model.predict(
posterior_estimates=point_posteriors,
df=df,
training_meta=training_meta,
prediction_meta=prediction_meta,
# false for point estimate
include_error=False,
**kwargs,
)
for k, v in point_predicted_dict.items():
point_predicted_dict[k] = np.squeeze(v, 0)
# to derive confidence interval; the condition should be sufficient since we add [50] by default
if self._n_bootstrap_draws > 0 and len(self._prediction_percentiles) > 1:
# perform bootstrap; we don't have posterior samples. hence, we just repeat the draw here.
posterior_samples = {}
for k, v in point_posteriors.items():
posterior_samples[k] = np.repeat(v, self._n_bootstrap_draws, axis=0)
predicted_dict = self._model.predict(
posterior_estimates=posterior_samples,
df=df,
training_meta=training_meta,
prediction_meta=prediction_meta,
include_error=True,
**kwargs,
)
if store_prediction_array:
self.prediction_array = predicted_dict[PredictionKeys.PREDICTION.value]
percentiles_dict = compute_percentiles(
predicted_dict, self._prediction_percentiles
)
# replace mid point prediction by point estimate
percentiles_dict.update(point_predicted_dict)
if PredictionKeys.PREDICTION.value not in percentiles_dict.keys():
raise ForecasterException(
"cannot find the key:'{}' from return of _predict()".format(
PredictionKeys.PREDICTION.value
)
)
# since we always assume to have decompose from .predict() at first,
# here it reduces to prediction when decompose is not requested
if not decompose:
k = PredictionKeys.PREDICTION.value
reduced_keys = [
k + "_" + str(p) if p != 50 else k
for p in self._prediction_percentiles
]
percentiles_dict = {
k: v for k, v in percentiles_dict.items() if k in reduced_keys
}
predicted_df = pd.DataFrame(percentiles_dict)
else:
if not decompose:
# reduce to prediction only if decompose is not requested
point_predicted_dict = {
k: v
for k, v in point_predicted_dict.items()
if k == PredictionKeys.PREDICTION.value
}
predicted_df = pd.DataFrame(point_predicted_dict)
predicted_df = prepend_date_column(predicted_df, df, self.date_col)
return predicted_df
# TODO: should be private
def load_extra_methods(self):
for method in self.extra_methods:
setattr(
self,
method,
partial(
getattr(self._model, method),
self.get_training_meta(),
PredictMethod.MAP.value,
self.get_point_posteriors(),
self.get_posterior_samples(),
),
)
def get_bic(self):
training_metrics = self.get_training_metrics()
loglk = training_metrics["loglk"]
n = loglk.shape[0] * loglk.shape[1]
k = training_metrics["num_of_params"]
return -2.0 * np.sum(loglk) + k * np.log(n)