in afa/core.py [0:0]
def run_cv(cfg, df, horiz, freq, cv_start, cv_stride=1, dc_dict=None,
metric="smape"):
"""Run a sliding-window temporal cross-validation (aka backtest) using a
given forecasting function (`func`).
"""
y = df["demand"].values
# allow only 1D time-series arrays
assert(y.ndim == 1)
params, func = cfg
if len(y) == 1:
y = np.pad(y, [1, 0], constant_values=1)
# the cross-val horizon length may shrink depending on the length of
# historical data; shrink the horizon if it is >= the timeseries
if horiz >= len(y):
cv_horiz = len(y) - 1
else:
cv_horiz = horiz
if len(df) == len(y):
ts = df.index
else:
assert len(y) > len(df)
diff = len(y) - len(df)
ts = np.append(
pd.date_range(end=df.index[0], freq=freq, periods=diff+1), df.index)
# sliding window horizon actuals
Y = sliding_window_view(y[cv_start:], cv_horiz)[::cv_stride,:]
Ycv = []
# | y | horiz |..............|
# | y | horiz |.............|
# | y | horiz |............|
# ::
# ::
# | y | horiz |
for i in range(cv_start, len(y)-cv_horiz+1, cv_stride):
yp = func(y[:i], cv_horiz, freq, dc=dc_dict[i])
Ycv.append(yp)
# keep the backtest forecasts at each cv_stride
Ycv = np.vstack(Ycv)
# keep the backtest forecast time indices
Yts = sliding_window_view(ts[cv_start:], cv_horiz)[::cv_stride,:]
assert Yts.shape == Y.shape
assert Yts.shape == Ycv.shape
assert not np.any(np.isnan(Ycv))
assert Ycv.shape == Y.shape
# calc. error metrics
df_results = calc_metrics(Y, Ycv, metric)
df_results.insert(0, "model_type", params.split("|")[0])
df_results.insert(1, "params", params)
# store the final backtest window actuals and predictions
df_results["y_cv"] = [Y]
df_results["yp_cv"] = [Ycv]
df_results["ts_cv"] = [Yts]
# generate the final forecast (1-dim)
df_results["yhat"] = [func(y, horiz, freq, dc=dc_dict[len(y)-1])]
return df_results