orbit/diagnostics/plot.py (595 lines of code) (raw):
# the following lines are added to fix unit test error
# or else the following line will give the following error
# TclError: no display name and no $DISPLAY environment variable
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import math
import os
from importlib import metadata
import statsmodels.api as sm
from scipy import stats
from matplotlib.markers import MarkerStyle
from ..constants.constants import PredictionKeys
from orbit.utils.general import is_empty_dataframe, is_ordered_datetime
from ..constants.constants import BacktestFitKeys
from ..constants.palette import PredictionPaletteClassic as PredPal
from orbit.constants import palette
from orbit.diagnostics.metrics import smape
from orbit.utils.plot import orbit_style_decorator
from ..exceptions import PlotException
import logging
logger = logging.getLogger("orbit")
@orbit_style_decorator
def plot_predicted_data(
training_actual_df,
predicted_df,
date_col,
actual_col,
pred_col=PredictionKeys.PREDICTION.value,
prediction_percentiles=None,
title="",
test_actual_df=None,
is_visible=True,
figsize=None,
path=None,
fontsize=None,
line_plot=False,
markersize=50,
lw=2,
linestyle="-",
):
"""plot training actual response together with predicted data; if actual response of predicted
data is there, plot it too.
Parameters
----------
training_actual_df : pd.DataFrame
training actual response data frame. two columns required: actual_col and date_col
predicted_df : pd.DataFrame
predicted data response data frame. two columns required: actual_col and pred_col. If
user provide prediction_percentiles, it needs to include them as well in such
`prediction_{x}` where x is the correspondent percentiles
prediction_percentiles : list
list of two elements indicates the lower and upper percentiles
date_col : str
the date column name
actual_col : str
pred_col : str
title : str
title of the plot
test_actual_df : pd.DataFrame
test actual response dataframe. two columns required: actual_col and date_col
is_visible : boolean
whether we want to show the plot. If called from unittest, is_visible might = False.
figsize : tuple
figsize pass through to `matplotlib.pyplot.figure()`
path : str
path to save the figure
fontsize : int; optional
fontsize of the title
line_plot : bool; default False
if True, make line plot for observations; otherwise, make scatter plot for observations
markersize : int; optional
point marker size
lw : int; optional
out-of-sample prediction line width
linestyle : str
linestyle of prediction plot
Returns
-------
matplotlib axes object
"""
if is_empty_dataframe(training_actual_df) or is_empty_dataframe(predicted_df):
raise ValueError("No prediction data or training response to plot.")
if not is_ordered_datetime(predicted_df[date_col]):
raise ValueError("Prediction df dates is not ordered.")
plot_confid = False
if prediction_percentiles is None:
_pred_percentiles = [5, 95]
else:
_pred_percentiles = prediction_percentiles
if len(_pred_percentiles) != 2:
raise ValueError(
"prediction_percentiles has to be None or a list with length=2."
)
confid_cols = [
"prediction_{}".format(_pred_percentiles[0]),
"prediction_{}".format(_pred_percentiles[1]),
]
if set(confid_cols).issubset(predicted_df.columns):
plot_confid = True
if not figsize:
figsize = (16, 8)
if not fontsize:
fontsize = 16
_training_actual_df = training_actual_df.copy()
_predicted_df = predicted_df.copy()
_training_actual_df[date_col] = pd.to_datetime(_training_actual_df[date_col])
_predicted_df[date_col] = pd.to_datetime(_predicted_df[date_col])
fig, ax = plt.subplots(facecolor="w", figsize=figsize)
if line_plot:
ax.plot(
_training_actual_df[date_col].values,
_training_actual_df[actual_col].values,
marker=None,
color=PredPal.ACTUAL_OBS.value,
lw=lw,
label="train response",
linestyle=linestyle,
)
else:
ax.scatter(
_training_actual_df[date_col].values,
_training_actual_df[actual_col].values,
marker=MarkerStyle("."),
color=PredPal.ACTUAL_OBS.value,
alpha=0.8,
s=markersize,
label="train response",
)
ax.plot(
_predicted_df[date_col].values,
_predicted_df[pred_col].values,
marker=None,
color=PredPal.PREDICTION_LINE.value,
lw=lw,
label=PredictionKeys.PREDICTION.value,
linestyle=linestyle,
)
# vertical line separate training and prediction
if _training_actual_df[date_col].values[-1] < _predicted_df[date_col].values[-1]:
ax.axvline(
x=_training_actual_df[date_col].values[-1],
color=PredPal.HOLDOUT_VERTICAL_LINE.value,
alpha=0.5,
linestyle="--",
)
if test_actual_df is not None:
test_actual_df = test_actual_df.copy()
test_actual_df[date_col] = pd.to_datetime(test_actual_df[date_col])
if line_plot:
ax.plot(
test_actual_df[date_col].values,
test_actual_df[actual_col].values,
marker=None,
color=PredPal.TEST_OBS.value,
lw=lw,
label="train response",
linestyle=linestyle,
)
else:
ax.scatter(
test_actual_df[date_col].values,
test_actual_df[actual_col].values,
marker=MarkerStyle("."),
color=PredPal.TEST_OBS.value,
s=markersize,
label="test response",
)
# prediction intervals
if plot_confid:
ax.fill_between(
_predicted_df[date_col].values,
_predicted_df[confid_cols[0]],
_predicted_df[confid_cols[1]],
facecolor=PredPal.PREDICTION_INTERVAL.value,
alpha=0.3,
)
ax.set_title(title, fontsize=fontsize)
# ax.grid(True, which='major', c='gray', ls='-', lw=1, alpha=0.5) --comment out since we have orbit style
ax.legend()
if path:
fig.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
return ax
@orbit_style_decorator
def plot_predicted_components(
predicted_df,
date_col,
prediction_percentiles=None,
plot_components=None,
title="",
figsize=None,
path=None,
fontsize=None,
is_visible=True,
):
"""Plot predicted components with the data frame of decomposed prediction where components
has been pre-defined as `trend`, `seasonality` and `regression`.
Parameters
----------
predicted_df : pd.DataFrame
predicted data response data frame. two columns required: actual_col and pred_col. If
user provide pred_percentiles_col, it needs to include them as well.
date_col : str
the date column name
prediction_percentiles : list
a list should consist exact two elements which will be used to plot as lower and upper bound of
confidence interval
plot_components : list
a list of strings to show the label of components to be plotted; by default, it uses values in
`orbit.constants.constants.PredictedComponents`.
title : str; optional
title of the plot
figsize : tuple; optional
figsize pass through to `matplotlib.pyplot.figure()`
path : str; optional
path to save the figure
fontsize : int; optional
fontsize of the title
is_visible : boolean
whether we want to show the plot. If called from unittest, is_visible might = False.
Returns
-------
matplotlib axes object
"""
_predicted_df = predicted_df.copy()
_predicted_df[date_col] = pd.to_datetime(_predicted_df[date_col])
if plot_components is None:
plot_components = [
PredictionKeys.TREND.value,
PredictionKeys.SEASONALITY.value,
PredictionKeys.REGRESSION.value,
]
plot_components = [
p for p in plot_components if p in _predicted_df.columns.tolist()
]
nrows = len(plot_components)
if not figsize:
figsize = (16, 8)
if not fontsize:
fontsize = 16
if prediction_percentiles is None:
_pred_percentiles = [5, 95]
else:
_pred_percentiles = prediction_percentiles
if len(_pred_percentiles) != 2:
raise ValueError(
"prediction_percentiles has to be None or a list with length=2."
)
fig, axes = plt.subplots(nrows=nrows, ncols=1, figsize=figsize, squeeze=False)
axes = axes.flatten()
for ax, comp in zip(axes, plot_components):
y = predicted_df[comp].values
ax.plot(
_predicted_df[date_col],
y,
marker=None,
color=PredPal.PREDICTION_INTERVAL.value,
)
confid_cols = [
"{}_{}".format(comp, _pred_percentiles[0]),
"{}_{}".format(comp, _pred_percentiles[1]),
]
if set(confid_cols).issubset(predicted_df.columns):
ax.fill_between(
_predicted_df[date_col].values,
_predicted_df[confid_cols[0]],
_predicted_df[confid_cols[1]],
facecolor=PredPal.PREDICTION_INTERVAL.value,
alpha=0.3,
)
ax.set_title(comp, fontsize=fontsize)
plt.suptitle(title, fontsize=fontsize)
fig.tight_layout()
if path:
plt.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
return axes
@orbit_style_decorator
def plot_bt_predictions(
bt_pred_df,
metrics=smape,
split_key_list=None,
ncol=2,
figsize=None,
include_vline=True,
title="",
fontsize=20,
path=None,
is_visible=True,
):
"""function to plot and visualize the prediction results from back testing.
bt_pred_df : data frame
the output of `orbit.diagnostics.backtest.BackTester.fit_predict()`, which includes the actuals/predictions
for all the splits
metrics : callable
the metric function
split_key_list: list; default None
with given model, which split keys to plot. If None, all the splits will be plotted
ncol : int
number of columns of the panel; number of rows will be decided accordingly
figsize : tuple
figure size
include_vline : bool
if plotting the vertical line to cut the in-sample and out-of-sample predictions for each split
title : str
title of the plot
fontsize: int; optional
fontsize of the title
path : string
path to save the figure
is_visible : bool
if displaying the figure
"""
if figsize is None:
figsize = (16, 8)
metric_vals = bt_pred_df.groupby(BacktestFitKeys.SPLIT_KEY.value).apply(
lambda x: metrics(
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.ACTUAL.value],
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.PREDICTED.value],
)
)
if split_key_list is None:
split_key_list_ = bt_pred_df[BacktestFitKeys.SPLIT_KEY.value].unique()
else:
split_key_list_ = split_key_list
num_splits = len(split_key_list_)
nrow = math.ceil(num_splits / ncol)
fig, axes = plt.subplots(
nrow,
ncol,
figsize=figsize,
squeeze=False,
facecolor="w",
constrained_layout=False,
)
for idx, split_key in enumerate(split_key_list_):
row_idx = idx // ncol
col_idx = idx % ncol
tmp = bt_pred_df[
bt_pred_df[BacktestFitKeys.SPLIT_KEY.value] == split_key
].copy()
axes[row_idx, col_idx].plot(
tmp[BacktestFitKeys.DATE.value],
tmp[BacktestFitKeys.PREDICTED.value],
# linewidth=2,
color=PredPal.PREDICTION_LINE.value,
)
axes[row_idx, col_idx].scatter(
tmp[BacktestFitKeys.DATE.value],
tmp[BacktestFitKeys.ACTUAL.value],
label=BacktestFitKeys.ACTUAL.value,
color=PredPal.ACTUAL_OBS.value,
alpha=0.6,
s=8,
)
# axes[row_idx, col_idx].grid(True, which='major', c='gray', ls='-', lw=1, alpha=0.4)
axes[row_idx, col_idx].set_title(
label="split {}; {} {:.3f}".format(
split_key, metrics.__name__, metric_vals[split_key]
)
)
if include_vline:
cutoff_date = tmp[~tmp[BacktestFitKeys.TRAIN_FLAG.value]][
BacktestFitKeys.DATE.value
].min()
axes[row_idx, col_idx].axvline(
x=cutoff_date,
linestyle="--",
color=PredPal.HOLDOUT_VERTICAL_LINE.value,
# linewidth=4,
alpha=0.8,
)
plt.suptitle(title, fontsize=fontsize)
fig.tight_layout()
if path:
fig.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
return axes
@orbit_style_decorator
def plot_bt_predictions2(
bt_pred_df,
metrics=smape,
split_key_list=None,
figsize=None,
include_vline=True,
title="",
fontsize=20,
markersize=50,
lw=2,
fig_dir=None,
is_visible=True,
fix_xylim=True,
export_gif=False,
):
"""a different style backtest plot compare to `plot_bt_prediction` where it writes separate plot for each split;
this is also used to produce an animation to summarize every split
"""
if figsize is None:
figsize = (16, 8)
fig_paths = list()
if fig_dir:
if not os.path.isdir(fig_dir) or not os.path.exists(fig_dir):
raise PlotException(
"Invalid or non-existing directory use specified: {}.".format(
os.path.abspath(fig_dir)
)
)
metric_vals = bt_pred_df.groupby(BacktestFitKeys.SPLIT_KEY.value).apply(
lambda x: metrics(
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.ACTUAL.value],
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.PREDICTED.value],
)
)
if split_key_list is None:
split_key_list_ = bt_pred_df[BacktestFitKeys.SPLIT_KEY.value].unique()
else:
split_key_list_ = split_key_list
xlim = None
ylim = None
if fix_xylim:
all_values = np.concatenate(
(
bt_pred_df[BacktestFitKeys.ACTUAL.value].values,
bt_pred_df[BacktestFitKeys.PREDICTED.value].values,
)
)
ylim = (np.min(all_values) * 0.99, np.max(all_values) * 1.01)
xlim = (
bt_pred_df[BacktestFitKeys.DATE.value].values[0],
bt_pred_df[BacktestFitKeys.DATE.value].values[-1],
)
for idx, split_key in enumerate(split_key_list_):
fig, ax = plt.subplots(1, 1, figsize=figsize)
tmp = bt_pred_df[
bt_pred_df[BacktestFitKeys.SPLIT_KEY.value] == split_key
].copy()
ax.plot(
tmp[BacktestFitKeys.DATE.value],
tmp[BacktestFitKeys.PREDICTED.value],
color=PredPal.PREDICTION_LINE.value,
lw=lw,
)
train_df = tmp.loc[tmp[BacktestFitKeys.TRAIN_FLAG.value], :]
ax.scatter(
train_df[BacktestFitKeys.DATE.value],
train_df[BacktestFitKeys.ACTUAL.value],
marker=MarkerStyle("."),
color=PredPal.ACTUAL_OBS.value,
alpha=0.8,
s=markersize,
label="train response",
)
test_df = tmp.loc[~tmp[BacktestFitKeys.TRAIN_FLAG.value], :]
ax.scatter(
test_df[BacktestFitKeys.DATE.value],
test_df[BacktestFitKeys.ACTUAL.value],
marker=MarkerStyle("."),
color=PredPal.TEST_OBS.value,
alpha=0.8,
s=markersize,
label="test response",
)
ax.set_title(
label="split {}; {} {:.3f}".format(
split_key, metrics.__name__, metric_vals[split_key]
)
)
if include_vline:
cutoff_date = tmp[~tmp[BacktestFitKeys.TRAIN_FLAG.value]][
BacktestFitKeys.DATE.value
].min()
ax.axvline(
x=cutoff_date,
linestyle="--",
color=PredPal.HOLDOUT_VERTICAL_LINE.value,
alpha=0.8,
)
if fix_xylim and ylim is not None:
ax.set_xlim(xlim)
ax.set_ylim(*ylim) # unpack the tuple
ax.legend()
plt.suptitle(title, fontsize=fontsize)
fig.tight_layout()
if fig_dir:
fig_path = "{}/splits_{}.png".format(fig_dir, idx)
fig_paths.append(fig_path)
fig.savefig(fig_path)
if is_visible:
plt.show()
else:
plt.close()
if fig_dir and export_gif:
package_name = "imageio"
try:
metadata.version(package_name)
import imageio
with imageio.get_writer(
"{}/orbit-backtest.gif".format(fig_dir), mode="I"
) as writer:
for fig_path in fig_paths:
image = imageio.imread(fig_path)
writer.append_data(image)
except metadata.PackageNotFoundError:
logger.error(
(
"{} not installed, which is necessary for gif animation".format(
package_name
)
)
)
# TODO: update palette
@orbit_style_decorator
def metric_horizon_barplot(
df,
model_col="model",
pred_horizon_col="pred_horizon",
metric_col="smape",
bar_width=0.1,
path=None,
figsize=None,
fontsize=None,
is_visible=False,
):
if not figsize:
figsize = [20, 6]
if not fontsize:
fontsize = 10
plt.rcParams["figure.figsize"] = figsize
models = df[model_col].unique()
metric_horizons = df[pred_horizon_col].unique()
n_models = len(models)
palette = sns.color_palette("colorblind", n_models)
# set height of bar
bars = list()
for m in models:
bars.append(list(df[df[model_col] == m][metric_col]))
# set position of bar on X axis
r = list()
r.append(np.arange(len(bars[0])))
for idx in range(n_models - 1):
r.append([x + bar_width for x in r[idx]])
# make the plot
for idx in range(n_models):
plt.bar(
r[idx],
bars[idx],
color=palette[idx],
width=bar_width,
edgecolor="white",
label=models[idx],
)
# add xticks on the middle of the group bars
plt.xlabel("predict-horizon", fontweight="bold")
plt.xticks([x + bar_width for x in range(len(bars[0]))], metric_horizons)
# create legend & show graphic
plt.legend()
plt.title("Model Comparison with {}".format(metric_col), fontsize=fontsize)
if path:
plt.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
@orbit_style_decorator
def params_comparison_boxplot(
data,
var_names,
model_names,
color_list=sns.color_palette(),
title="Params Comparison",
fig_size=(10, 6),
box_width=0.1,
box_distance=0.2,
showfliers=False,
):
"""compare the distribution of parameters from different models uisng a boxplot.
Parameters:
data : a list of dict with keys as the parameters of interest
var_names : a list of strings, the labels of the parameters to compare
model_names : a list of strings, the names of models to compare
color_list : a list of strings, the color to use for differentiating models
title : string
the title of the chart
fig_size : tuple
figure size
box_width : float
width of the boxes in the boxplot
box_distance : float
the distance between each boxes in the boxplot
showfliers : boolean
show outliers in the chart if set as True
Returns:
a boxplot comparing parameter distributions from different models side by side
"""
fig, ax = plt.subplots(1, 1, figsize=fig_size)
handles = []
n_models = len(model_names)
pos = []
if n_models % 2 == 0:
for n in range(1, int(n_models / 2) + 1):
pos.append(round(box_distance * (-1) ** (n_models - 1) * n, 1))
pos.append(round(box_distance * (-1) ** (n_models) * n, 1))
else:
for n in range(1, int((n_models - 1) / 2) + 1):
pos.append(0)
pos.append(round(box_distance * (-1) ** (n_models - 1) * n, 1))
pos.append(round(box_distance * (-1) ** (n_models) * n, 1))
pos = sorted(pos)
for i in range(len(model_names)):
plt_arr = []
for var in var_names:
plt_arr.append(data[i][var].flatten())
plt_arr = np.vstack(plt_arr).T
globals()[f"bp{i}"] = ax.boxplot(
plt_arr,
positions=np.arange(plt_arr.shape[1]) + pos[i],
widths=box_width,
patch_artist=True,
manage_ticks=False,
boxprops=dict(facecolor=color_list[i]),
medianprops=dict(color="black"),
showfliers=showfliers,
)
handles.append(globals()[f"bp{i}"]["boxes"][0])
plt.xticks(np.arange(len(var_names)), var_names)
ax.legend(handles, model_names)
plt.xlabel("params")
plt.ylabel("value")
plt.title(title)
return ax
@orbit_style_decorator
def residual_diagnostic_plot(
df,
dist="norm",
date_col="week",
residual_col="residual",
fitted_col="prediction",
sparams=None,
):
"""
Parameters
----------
df : pd.DataFrame
dist : str
date_col : str
column name of date
residual_col : str
column name of residual
fitted_col: str
column name of fitted value from model
sparams : float or list
extra parameters used in distribution such as t-dist
Notes
-----
1. residual by time
2. residual vs fitted
3. residual histogram with vertical line as mean
4. residuals qq plot
5. residual ACF
6. residual PACF
"""
fig, ax = plt.subplots(3, 2, figsize=(15, 12))
# plot 1 residual by time
sns.lineplot(
x=date_col,
y=residual_col,
data=df,
ax=ax[0, 0],
color=palette.OrbitPalette.BLUE.value,
alpha=0.8,
label="residual",
)
ax[0, 0].set_title("Residual by Time")
ax[0, 0].legend()
# plot 2 residual vs fitted
sns.scatterplot(
x=fitted_col,
y=residual_col,
data=df,
ax=ax[0, 1],
color=palette.OrbitPalette.BLUE.value,
alpha=0.8,
label="residual",
)
ax[0, 1].axhline(
y=0,
linestyle="--",
color=palette.OrbitPalette.BLACK.value,
alpha=0.5,
label="0",
)
ax[0, 1].set_title("Residual vs Fitted")
ax[0, 1].set_xlabel("fitted")
ax[0, 1].legend()
# plot 3 residual histogram with vertical line as mean
sns.histplot(
df[residual_col].values,
kde=True,
ax=ax[1, 0],
color=palette.OrbitPalette.BLUE.value,
label="residual",
edgecolor="white",
alpha=0.5,
facecolor=palette.OrbitPalette.BLUE.value,
)
ax[1, 0].set_title("Residual Distribution")
ax[1, 0].axvline(
df[residual_col].mean(),
color=palette.OrbitPalette.ORANGE.value,
linestyle="--",
alpha=0.9,
label="residual mean",
)
ax[1, 0].set_ylabel("density")
ax[1, 0].legend()
# plot 4 residual qq plot
if dist == "norm":
_ = stats.probplot(df[residual_col].values, dist="norm", plot=ax[1, 1])
elif dist == "t-dist":
# t-dist qq-plot
_ = stats.probplot(
df[residual_col].values, dist=stats.t, sparams=sparams, plot=ax[1, 1]
)
# plot 5 residual ACF
sm.graphics.tsa.plot_acf(
df[residual_col].values,
ax=ax[2, 0],
title="Residual ACF",
color=palette.OrbitPalette.BLUE.value,
)
ax[2, 0].set_xlabel("lag")
ax[2, 0].set_ylabel("acf")
# plot 6 residual PACF
sm.graphics.tsa.plot_pacf(
df[residual_col].values,
ax=ax[2, 1],
title="Residual PACF",
color=palette.OrbitPalette.BLUE.value,
)
ax[2, 1].set_xlabel("lag")
ax[2, 1].set_ylabel("pacf")
fig.tight_layout()