orbit/diagnostics/plot.py (592 lines of code) (raw):
# the following lines are added to fix unit test error
# or else the following line will give the following error
# TclError: no display name and no $DISPLAY environment variable
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import math
import os
import pkg_resources
import statsmodels.api as sm
from scipy import stats
from ..constants.constants import PredictionKeys
from orbit.utils.general import is_empty_dataframe, is_ordered_datetime
from ..constants.constants import BacktestFitKeys
from ..constants.palette import PredictionPaletteClassic as PredPal
from orbit.constants import palette
from orbit.diagnostics.metrics import smape
from orbit.utils.plot import orbit_style_decorator
from ..exceptions import PlotException
import logging
logger = logging.getLogger("orbit")
@orbit_style_decorator
def plot_predicted_data(
training_actual_df,
predicted_df,
date_col,
actual_col,
pred_col=PredictionKeys.PREDICTION.value,
prediction_percentiles=None,
title="",
test_actual_df=None,
is_visible=True,
figsize=None,
path=None,
fontsize=None,
line_plot=False,
markersize=50,
lw=2,
linestyle="-",
):
"""plot training actual response together with predicted data; if actual response of predicted
data is there, plot it too.
Parameters
----------
training_actual_df : pd.DataFrame
training actual response data frame. two columns required: actual_col and date_col
predicted_df : pd.DataFrame
predicted data response data frame. two columns required: actual_col and pred_col. If
user provide prediction_percentiles, it needs to include them as well in such
`prediction_{x}` where x is the correspondent percentiles
prediction_percentiles : list
list of two elements indicates the lower and upper percentiles
date_col : str
the date column name
actual_col : str
pred_col : str
title : str
title of the plot
test_actual_df : pd.DataFrame
test actual response dataframe. two columns required: actual_col and date_col
is_visible : boolean
whether we want to show the plot. If called from unittest, is_visible might = False.
figsize : tuple
figsize pass through to `matplotlib.pyplot.figure()`
path : str
path to save the figure
fontsize : int; optional
fontsize of the title
line_plot : bool; default False
if True, make line plot for observations; otherwise, make scatter plot for observations
markersize : int; optional
point marker size
lw : int; optional
out-of-sample prediction line width
linestyle : str
linestyle of prediction plot
Returns
-------
matplotlib axes object
"""
if is_empty_dataframe(training_actual_df) or is_empty_dataframe(predicted_df):
raise ValueError("No prediction data or training response to plot.")
if not is_ordered_datetime(predicted_df[date_col]):
raise ValueError("Prediction df dates is not ordered.")
plot_confid = False
if prediction_percentiles is None:
_pred_percentiles = [5, 95]
else:
_pred_percentiles = prediction_percentiles
if len(_pred_percentiles) != 2:
raise ValueError(
"prediction_percentiles has to be None or a list with length=2."
)
confid_cols = [
"prediction_{}".format(_pred_percentiles[0]),
"prediction_{}".format(_pred_percentiles[1]),
]
if set(confid_cols).issubset(predicted_df.columns):
plot_confid = True
if not figsize:
figsize = (16, 8)
if not fontsize:
fontsize = 16
_training_actual_df = training_actual_df.copy()
_predicted_df = predicted_df.copy()
_training_actual_df[date_col] = pd.to_datetime(_training_actual_df[date_col])
_predicted_df[date_col] = pd.to_datetime(_predicted_df[date_col])
fig, ax = plt.subplots(facecolor="w", figsize=figsize)
if line_plot:
ax.plot(
_training_actual_df[date_col].values,
_training_actual_df[actual_col].values,
marker=None,
color=PredPal.ACTUAL_OBS.value,
lw=lw,
label="train response",
linestyle=linestyle,
)
else:
ax.scatter(
_training_actual_df[date_col].values,
_training_actual_df[actual_col].values,
marker=".",
color=PredPal.ACTUAL_OBS.value,
alpha=0.8,
s=markersize,
label="train response",
)
ax.plot(
_predicted_df[date_col].values,
_predicted_df[pred_col].values,
marker=None,
color=PredPal.PREDICTION_LINE.value,
lw=lw,
label=PredictionKeys.PREDICTION.value,
linestyle=linestyle,
)
# vertical line separate training and prediction
if _training_actual_df[date_col].values[-1] < _predicted_df[date_col].values[-1]:
ax.axvline(
x=_training_actual_df[date_col].values[-1],
color=PredPal.HOLDOUT_VERTICAL_LINE.value,
alpha=0.5,
linestyle="--",
)
if test_actual_df is not None:
test_actual_df = test_actual_df.copy()
test_actual_df[date_col] = pd.to_datetime(test_actual_df[date_col])
if line_plot:
ax.plot(
test_actual_df[date_col].values,
test_actual_df[actual_col].values,
marker=None,
color=PredPal.TEST_OBS.value,
lw=lw,
label="train response",
linestyle=linestyle,
)
else:
ax.scatter(
test_actual_df[date_col].values,
test_actual_df[actual_col].values,
marker=".",
color=PredPal.TEST_OBS.value,
s=markersize,
label="test response",
)
# prediction intervals
if plot_confid:
ax.fill_between(
_predicted_df[date_col].values,
_predicted_df[confid_cols[0]],
_predicted_df[confid_cols[1]],
facecolor=PredPal.PREDICTION_INTERVAL.value,
alpha=0.3,
)
ax.set_title(title, fontsize=fontsize)
# ax.grid(True, which='major', c='gray', ls='-', lw=1, alpha=0.5) --comment out since we have orbit style
ax.legend()
if path:
fig.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
return ax
@orbit_style_decorator
def plot_predicted_components(
predicted_df,
date_col,
prediction_percentiles=None,
plot_components=None,
title="",
figsize=None,
path=None,
fontsize=None,
is_visible=True,
):
"""Plot predicted components with the data frame of decomposed prediction where components
has been pre-defined as `trend`, `seasonality` and `regression`.
Parameters
----------
predicted_df : pd.DataFrame
predicted data response data frame. two columns required: actual_col and pred_col. If
user provide pred_percentiles_col, it needs to include them as well.
date_col : str
the date column name
prediction_percentiles : list
a list should consist exact two elements which will be used to plot as lower and upper bound of
confidence interval
plot_components : list
a list of strings to show the label of components to be plotted; by default, it uses values in
`orbit.constants.constants.PredictedComponents`.
title : str; optional
title of the plot
figsize : tuple; optional
figsize pass through to `matplotlib.pyplot.figure()`
path : str; optional
path to save the figure
fontsize : int; optional
fontsize of the title
is_visible : boolean
whether we want to show the plot. If called from unittest, is_visible might = False.
Returns
-------
matplotlib axes object
"""
_predicted_df = predicted_df.copy()
_predicted_df[date_col] = pd.to_datetime(_predicted_df[date_col])
if plot_components is None:
plot_components = [
PredictionKeys.TREND.value,
PredictionKeys.SEASONALITY.value,
PredictionKeys.REGRESSION.value,
]
plot_components = [
p for p in plot_components if p in _predicted_df.columns.tolist()
]
nrows = len(plot_components)
if not figsize:
figsize = (16, 8)
if not fontsize:
fontsize = 16
if prediction_percentiles is None:
_pred_percentiles = [5, 95]
else:
_pred_percentiles = prediction_percentiles
if len(_pred_percentiles) != 2:
raise ValueError(
"prediction_percentiles has to be None or a list with length=2."
)
fig, axes = plt.subplots(nrows=nrows, ncols=1, figsize=figsize, squeeze=False)
axes = axes.flatten()
for ax, comp in zip(axes, plot_components):
y = predicted_df[comp].values
ax.plot(
_predicted_df[date_col],
y,
marker=None,
color=PredPal.PREDICTION_INTERVAL.value,
)
confid_cols = [
"{}_{}".format(comp, _pred_percentiles[0]),
"{}_{}".format(comp, _pred_percentiles[1]),
]
if set(confid_cols).issubset(predicted_df.columns):
ax.fill_between(
_predicted_df[date_col].values,
_predicted_df[confid_cols[0]],
_predicted_df[confid_cols[1]],
facecolor=PredPal.PREDICTION_INTERVAL.value,
alpha=0.3,
)
ax.set_title(comp, fontsize=fontsize)
plt.suptitle(title, fontsize=fontsize)
fig.tight_layout()
if path:
plt.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
return axes
@orbit_style_decorator
def plot_bt_predictions(
bt_pred_df,
metrics=smape,
split_key_list=None,
ncol=2,
figsize=None,
include_vline=True,
title="",
fontsize=20,
path=None,
is_visible=True,
):
"""function to plot and visualize the prediction results from back testing.
bt_pred_df : data frame
the output of `orbit.diagnostics.backtest.BackTester.fit_predict()`, which includes the actuals/predictions
for all the splits
metrics : callable
the metric function
split_key_list: list; default None
with given model, which split keys to plot. If None, all the splits will be plotted
ncol : int
number of columns of the panel; number of rows will be decided accordingly
figsize : tuple
figure size
include_vline : bool
if plotting the vertical line to cut the in-sample and out-of-sample predictions for each split
title : str
title of the plot
fontsize: int; optional
fontsize of the title
path : string
path to save the figure
is_visible : bool
if displaying the figure
"""
if figsize is None:
figsize = (16, 8)
metric_vals = bt_pred_df.groupby(BacktestFitKeys.SPLIT_KEY.value).apply(
lambda x: metrics(
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.ACTUAL.value],
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.PREDICTED.value],
)
)
if split_key_list is None:
split_key_list_ = bt_pred_df[BacktestFitKeys.SPLIT_KEY.value].unique()
else:
split_key_list_ = split_key_list
num_splits = len(split_key_list_)
nrow = math.ceil(num_splits / ncol)
fig, axes = plt.subplots(
nrow,
ncol,
figsize=figsize,
squeeze=False,
facecolor="w",
constrained_layout=False,
)
for idx, split_key in enumerate(split_key_list_):
row_idx = idx // ncol
col_idx = idx % ncol
tmp = bt_pred_df[
bt_pred_df[BacktestFitKeys.SPLIT_KEY.value] == split_key
].copy()
axes[row_idx, col_idx].plot(
tmp[BacktestFitKeys.DATE.value],
tmp[BacktestFitKeys.PREDICTED.value],
# linewidth=2,
color=PredPal.PREDICTION_LINE.value,
)
axes[row_idx, col_idx].scatter(
tmp[BacktestFitKeys.DATE.value],
tmp[BacktestFitKeys.ACTUAL.value],
label=BacktestFitKeys.ACTUAL.value,
color=PredPal.ACTUAL_OBS.value,
alpha=0.6,
s=8,
)
# axes[row_idx, col_idx].grid(True, which='major', c='gray', ls='-', lw=1, alpha=0.4)
axes[row_idx, col_idx].set_title(
label="split {}; {} {:.3f}".format(
split_key, metrics.__name__, metric_vals[split_key]
)
)
if include_vline:
cutoff_date = tmp[~tmp[BacktestFitKeys.TRAIN_FLAG.value]][
BacktestFitKeys.DATE.value
].min()
axes[row_idx, col_idx].axvline(
x=cutoff_date,
linestyle="--",
color=PredPal.HOLDOUT_VERTICAL_LINE.value,
# linewidth=4,
alpha=0.8,
)
plt.suptitle(title, fontsize=fontsize)
fig.tight_layout()
if path:
fig.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
return axes
@orbit_style_decorator
def plot_bt_predictions2(
bt_pred_df,
metrics=smape,
split_key_list=None,
figsize=None,
include_vline=True,
title="",
fontsize=20,
markersize=50,
lw=2,
fig_dir=None,
is_visible=True,
fix_xylim=True,
export_gif=False,
):
"""a different style backtest plot compare to `plot_bt_prediction` where it writes separate plot for each split;
this is also used to produce an animation to summarize every split
"""
if figsize is None:
figsize = (16, 8)
if fig_dir:
if not os.path.isdir(fig_dir) or not os.path.exists(fig_dir):
raise PlotException(
"Invalid or non-existing directory use specified: {}.".format(
os.path.abspath(fig_dir)
)
)
fig_paths = list()
metric_vals = bt_pred_df.groupby(BacktestFitKeys.SPLIT_KEY.value).apply(
lambda x: metrics(
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.ACTUAL.value],
x[~x[BacktestFitKeys.TRAIN_FLAG.value]][BacktestFitKeys.PREDICTED.value],
)
)
if split_key_list is None:
split_key_list_ = bt_pred_df[BacktestFitKeys.SPLIT_KEY.value].unique()
else:
split_key_list_ = split_key_list
if fix_xylim:
all_values = np.concatenate(
(
bt_pred_df[BacktestFitKeys.ACTUAL.value].values,
bt_pred_df[BacktestFitKeys.PREDICTED.value].values,
)
)
ylim = (np.min(all_values) * 0.99, np.max(all_values) * 1.01)
xlim = (
bt_pred_df[BacktestFitKeys.DATE.value].values[0],
bt_pred_df[BacktestFitKeys.DATE.value].values[-1],
)
for idx, split_key in enumerate(split_key_list_):
fig, ax = plt.subplots(1, 1, figsize=figsize)
tmp = bt_pred_df[
bt_pred_df[BacktestFitKeys.SPLIT_KEY.value] == split_key
].copy()
ax.plot(
tmp[BacktestFitKeys.DATE.value],
tmp[BacktestFitKeys.PREDICTED.value],
color=PredPal.PREDICTION_LINE.value,
lw=lw,
)
train_df = tmp.loc[tmp[BacktestFitKeys.TRAIN_FLAG.value], :]
ax.scatter(
train_df[BacktestFitKeys.DATE.value],
train_df[BacktestFitKeys.ACTUAL.value],
marker=".",
color=PredPal.ACTUAL_OBS.value,
alpha=0.8,
s=markersize,
label="train response",
)
test_df = tmp.loc[~tmp[BacktestFitKeys.TRAIN_FLAG.value], :]
ax.scatter(
test_df[BacktestFitKeys.DATE.value],
test_df[BacktestFitKeys.ACTUAL.value],
marker=".",
color=PredPal.TEST_OBS.value,
alpha=0.8,
s=markersize,
label="test response",
)
ax.set_title(
label="split {}; {} {:.3f}".format(
split_key, metrics.__name__, metric_vals[split_key]
)
)
if include_vline:
cutoff_date = tmp[~tmp[BacktestFitKeys.TRAIN_FLAG.value]][
BacktestFitKeys.DATE.value
].min()
ax.axvline(
x=cutoff_date,
linestyle="--",
color=PredPal.HOLDOUT_VERTICAL_LINE.value,
alpha=0.8,
)
if fix_xylim:
ax.set_xlim(xlim)
ax.set_ylim(ylim)
ax.legend()
plt.suptitle(title, fontsize=fontsize)
fig.tight_layout()
if fig_dir:
fig_path = "{}/splits_{}.png".format(fig_dir, idx)
fig_paths.append(fig_path)
fig.savefig(fig_path)
if is_visible:
plt.show()
else:
plt.close()
if fig_dir and export_gif:
package_name = "imageio"
try:
pkg_resources.get_distribution(package_name)
import imageio
with imageio.get_writer(
"{}/orbit-backtest.gif".format(fig_dir), mode="I"
) as writer:
for fig_path in fig_paths:
image = imageio.imread(fig_path)
writer.append_data(image)
except pkg_resources.DistributionNotFound:
logger.error(
(
"{} not installed, which is necessary for gif animation".format(
package_name
)
)
)
# TODO: update palette
@orbit_style_decorator
def metric_horizon_barplot(
df,
model_col="model",
pred_horizon_col="pred_horizon",
metric_col="smape",
bar_width=0.1,
path=None,
figsize=None,
fontsize=None,
is_visible=False,
):
if not figsize:
figsize = [20, 6]
if not fontsize:
fontsize = 10
plt.rcParams["figure.figsize"] = figsize
models = df[model_col].unique()
metric_horizons = df[pred_horizon_col].unique()
n_models = len(models)
palette = sns.color_palette("colorblind", n_models)
# set height of bar
bars = list()
for m in models:
bars.append(list(df[df[model_col] == m][metric_col]))
# set position of bar on X axis
r = list()
r.append(np.arange(len(bars[0])))
for idx in range(n_models - 1):
r.append([x + bar_width for x in r[idx]])
# make the plot
for idx in range(n_models):
plt.bar(
r[idx],
bars[idx],
color=palette[idx],
width=bar_width,
edgecolor="white",
label=models[idx],
)
# add xticks on the middle of the group bars
plt.xlabel("predict-horizon", fontweight="bold")
plt.xticks([x + bar_width for x in range(len(bars[0]))], metric_horizons)
# create legend & show graphic
plt.legend()
plt.title("Model Comparison with {}".format(metric_col), fontsize=fontsize)
if path:
plt.savefig(path)
if is_visible:
plt.show()
else:
plt.close()
@orbit_style_decorator
def params_comparison_boxplot(
data,
var_names,
model_names,
color_list=sns.color_palette(),
title="Params Comparison",
fig_size=(10, 6),
box_width=0.1,
box_distance=0.2,
showfliers=False,
):
"""compare the distribution of parameters from different models uisng a boxplot.
Parameters:
data : a list of dict with keys as the parameters of interest
var_names : a list of strings, the labels of the parameters to compare
model_names : a list of strings, the names of models to compare
color_list : a list of strings, the color to use for differentiating models
title : string
the title of the chart
fig_size : tuple
figure size
box_width : float
width of the boxes in the boxplot
box_distance : float
the distance between each boxes in the boxplot
showfliers : boolean
show outliers in the chart if set as True
Returns:
a boxplot comparing parameter distributions from different models side by side
"""
fig, ax = plt.subplots(1, 1, figsize=fig_size)
handles = []
n_models = len(model_names)
pos = []
if n_models % 2 == 0:
for n in range(1, int(n_models / 2) + 1):
pos.append(round(box_distance * (-1) ** (n_models - 1) * n, 1))
pos.append(round(box_distance * (-1) ** (n_models) * n, 1))
else:
for n in range(1, int((n_models - 1) / 2) + 1):
pos.append(0)
pos.append(round(box_distance * (-1) ** (n_models - 1) * n, 1))
pos.append(round(box_distance * (-1) ** (n_models) * n, 1))
pos = sorted(pos)
for i in range(len(model_names)):
plt_arr = []
for var in var_names:
plt_arr.append(data[i][var].flatten())
plt_arr = np.vstack(plt_arr).T
globals()[f"bp{i}"] = ax.boxplot(
plt_arr,
positions=np.arange(plt_arr.shape[1]) + pos[i],
widths=box_width,
patch_artist=True,
manage_ticks=False,
boxprops=dict(facecolor=color_list[i]),
medianprops=dict(color="black"),
showfliers=showfliers,
)
handles.append(globals()[f"bp{i}"]["boxes"][0])
plt.xticks(np.arange(len(var_names)), var_names)
ax.legend(handles, model_names)
plt.xlabel("params")
plt.ylabel("value")
plt.title(title)
return ax
@orbit_style_decorator
def residual_diagnostic_plot(
df,
dist="norm",
date_col="week",
residual_col="residual",
fitted_col="prediction",
sparams=None,
):
"""
Parameters
----------
df : pd.DataFrame
dist : str
date_col : str
column name of date
residual_col : str
column name of residual
fitted_col: str
column name of fitted value from model
sparams : float or list
extra parameters used in distribution such as t-dist
Notes
-----
1. residual by time
2. residual vs fitted
3. residual histogram with vertical line as mean
4. residuals qq plot
5. residual ACF
6. residual PACF
"""
fig, ax = plt.subplots(3, 2, figsize=(15, 12))
# plot 1 residual by time
sns.lineplot(
x=date_col,
y=residual_col,
data=df,
ax=ax[0, 0],
color=palette.OrbitPalette.BLUE.value,
alpha=0.8,
label="residual",
)
ax[0, 0].set_title("Residual by Time")
ax[0, 0].legend()
# plot 2 residual vs fitted
sns.scatterplot(
x=fitted_col,
y=residual_col,
data=df,
ax=ax[0, 1],
color=palette.OrbitPalette.BLUE.value,
alpha=0.8,
label="residual",
)
ax[0, 1].axhline(
y=0,
linestyle="--",
color=palette.OrbitPalette.BLACK.value,
alpha=0.5,
label="0",
)
ax[0, 1].set_title("Residual vs Fitted")
ax[0, 1].set_xlabel("fitted")
ax[0, 1].legend()
# plot 3 residual histogram with vertical line as mean
sns.histplot(
df[residual_col].values,
kde=True,
ax=ax[1, 0],
color=palette.OrbitPalette.BLUE.value,
label="residual",
edgecolor="white",
alpha=0.5,
facecolor=palette.OrbitPalette.BLUE.value,
)
ax[1, 0].set_title("Residual Distribution")
ax[1, 0].axvline(
df[residual_col].mean(),
color=palette.OrbitPalette.ORANGE.value,
linestyle="--",
alpha=0.9,
label="residual mean",
)
ax[1, 0].set_ylabel("density")
ax[1, 0].legend()
# plot 4 residual qq plot
if dist == "norm":
_ = stats.probplot(df[residual_col].values, dist="norm", plot=ax[1, 1])
elif dist == "t-dist":
# t-dist qq-plot
_ = stats.probplot(
df[residual_col].values, dist=stats.t, sparams=sparams, plot=ax[1, 1]
)
# plot 5 residual ACF
sm.graphics.tsa.plot_acf(
df[residual_col].values,
ax=ax[2, 0],
title="Residual ACF",
color=palette.OrbitPalette.BLUE.value,
)
ax[2, 0].set_xlabel("lag")
ax[2, 0].set_ylabel("acf")
# plot 6 residual PACF
sm.graphics.tsa.plot_pacf(
df[residual_col].values,
ax=ax[2, 1],
title="Residual PACF",
color=palette.OrbitPalette.BLUE.value,
)
ax[2, 1].set_xlabel("lag")
ax[2, 1].set_ylabel("pacf")
fig.tight_layout()