orbit/forecaster/svi.py (192 lines of code) (raw):

import numpy as np import pandas as pd from functools import partial from ..constants.constants import PredictMethod, PredictionKeys from ..exceptions import ForecasterException from ..utils.predictions import prepend_date_column, compute_percentiles from .forecaster import Forecaster class SVIForecaster(Forecaster): def __init__(self, **kwargs): super().__init__(**kwargs) # method we used to define posteriors and prediction parameters self._point_method = None # init aggregate posteriors self._point_posteriors = { PredictMethod.MEAN.value: dict(), PredictMethod.MEDIAN.value: dict(), } def set_forecaster_training_meta(self, data_input): # MCMC flag to be true data_input.update({"WITH_MCMC": 0}) return data_input def fit( self, df, point_method=None, keep_samples=True, sampling_temperature=1.0, **kwargs, ): super().fit(df, sampling_temperature=sampling_temperature, **kwargs) self._point_method = point_method if point_method is not None: if point_method not in [ PredictMethod.MEAN.value, PredictMethod.MEDIAN.value, ]: raise ForecasterException("Invalid point estimate method.") mean_posteriors = {} median_posteriors = {} # for each model param, aggregate using `method` for param_name in self._model.get_model_param_names(): param_ndarray = self._posterior_samples[param_name] mean_posteriors.update( {param_name: np.mean(param_ndarray, axis=0, keepdims=True)}, ) median_posteriors.update( {param_name: np.median(param_ndarray, axis=0, keepdims=True)}, ) self._point_posteriors[PredictMethod.MEAN.value] = mean_posteriors self._point_posteriors[PredictMethod.MEDIAN.value] = median_posteriors if point_method is not None and not keep_samples: self._posterior_samples = {} self.load_extra_methods() return self @staticmethod def _bootstrap(num_samples, posterior_samples, n): """Draw `n` number of bootstrap samples from the posterior_samples. Args ---- n : int The number of bootstrap samples to draw """ if n < 2: raise ForecasterException( "Error: Number of bootstrap draws must be at least 2" ) sample_idx = np.random.choice(range(num_samples), size=n, replace=True) bootstrap_samples_dict = {} for k, v in posterior_samples.items(): bootstrap_samples_dict[k] = v[sample_idx] return bootstrap_samples_dict def predict( self, df, decompose=False, store_prediction_array=False, seed=None, **kwargs ) -> pd.DataFrame: # raise if model is not fitted if not self.is_fitted(): raise ForecasterException("Model is not fitted yet.") # obtain basic meta data from input df self._set_prediction_meta(df) prediction_meta = self.get_prediction_meta() training_meta = self.get_training_meta() if seed is not None: np.random.seed(seed) if self._point_method is None: # full posteriors prediction # if bootstrap draws, replace posterior samples with bootstrap posterior_samples = ( self._bootstrap( num_samples=self.estimator.num_sample, posterior_samples=self._posterior_samples, n=self._n_bootstrap_draws, ) if self._n_bootstrap_draws > 1 else self._posterior_samples ) predicted_dict = self._model.predict( posterior_estimates=posterior_samples, df=df, training_meta=training_meta, prediction_meta=prediction_meta, include_error=True, **kwargs, ) if PredictionKeys.PREDICTION.value not in predicted_dict.keys(): raise ForecasterException( "cannot find the key:'{}' from return of _predict()".format( PredictionKeys.PREDICTION.value ) ) # reduce to prediction only if decompose is not requested if not decompose: predicted_dict = { k: v for k, v in predicted_dict.items() if k == PredictionKeys.PREDICTION.value } if store_prediction_array: self.prediction_array = predicted_dict[PredictionKeys.PREDICTION.value] percentiles_dict = compute_percentiles( predicted_dict, self._prediction_percentiles ) predicted_df = pd.DataFrame(percentiles_dict) predicted_df = prepend_date_column(predicted_df, df, self.date_col) return predicted_df else: # perform point prediction point_posteriors = self._point_posteriors.get(self._point_method) point_predicted_dict = self._model.predict( posterior_estimates=point_posteriors, df=df, training_meta=training_meta, prediction_meta=prediction_meta, include_error=False, **kwargs, ) for k, v in point_predicted_dict.items(): point_predicted_dict[k] = np.squeeze(v, 0) # to derive confidence interval; the condition should be sufficient since we add [50] by default if self._n_bootstrap_draws > 0 and len(self._prediction_percentiles) > 1: # perform bootstrap; we don't have posterior samples. hence, we just repeat the draw here. posterior_samples = {} for k, v in point_posteriors.items(): posterior_samples[k] = np.repeat(v, self.n_bootstrap_draws, axis=0) predicted_dict = self._model.predict( posterior_estimates=posterior_samples, df=df, training_meta=training_meta, prediction_meta=prediction_meta, include_error=True, **kwargs, ) percentiles_dict = compute_percentiles( predicted_dict, self._prediction_percentiles ) # replace mid point prediction by point estimate percentiles_dict.update(point_predicted_dict) if PredictionKeys.PREDICTION.value not in percentiles_dict.keys(): raise ForecasterException( "cannot find the key:'{}' from return of _predict()".format( PredictionKeys.PREDICTION.value ) ) # reduce to prediction only if decompose is not requested if not decompose: k = PredictionKeys.PREDICTION.value reduced_keys = [ k + "_" + str(p) if p != 50 else k for p in self._prediction_percentiles ] percentiles_dict = { k: v for k, v in percentiles_dict.items() if k in reduced_keys } predicted_df = pd.DataFrame(percentiles_dict) else: if not decompose: # reduce to prediction only if decompose is not requested point_predicted_dict = { k: v for k, v in point_predicted_dict.items() if k == PredictionKeys.PREDICTION.value } predicted_df = pd.DataFrame(point_predicted_dict) predicted_df = prepend_date_column(predicted_df, df, self.date_col) return predicted_df def load_extra_methods(self): for method in self.extra_methods: setattr( self, method, partial( getattr(self._model, method), self.get_training_meta(), self._point_method, self.get_point_posteriors(), self.get_posterior_samples(), ), ) def get_wbic(self): # This function calculates the WBIC given that MCMC sampling happened with sampling_temperature = log(n) training_metrics = self.get_training_metrics() # get the training metrics training_meta = self.get_training_meta() # get the meta data sampling_temp = training_metrics[ "sampling_temperature" ] # get the sampling temperature nobs = training_meta["num_of_obs"] # the number of observations if sampling_temp != np.log(nobs): raise ForecasterException( "Sampling temperature is not log(n); WBIC calculation is not valid!" ) return -2 * np.nanmean(training_metrics["loglk"]) * nobs def fit_wbic(self, df): """This function calculates the WBIC for a Orbit model Note that if sampling has not been done ith sampling_temperature = log(n) then the MCMC sampling is redone to get the WBIC """ nobs = df.shape[0] self.fit(df, sampling_temperature=np.log(nobs)) return self.get_wbic()