in kats/detectors/cusum_detection.py [0:0]
def detector(self, **kwargs) -> Sequence[CUSUMChangePoint]:
"""
Find the change point and calculate related statistics.
Args:
threshold: Optional; float; significance level, default: 0.01.
max_iter: Optional; int, maximum iteration in finding the
changepoint.
delta_std_ratio: Optional; float; the mean delta have to larger than
this parameter times std of the data to be consider as a change.
min_abs_change: Optional; int; minimal absolute delta between mu0
and mu1.
start_point: Optional; int; the start idx of the changepoint, if
None means the middle of the time series.
change_directions: Optional; list<str>; a list contain either or
both 'increase' and 'decrease' to specify what type of change
want to detect.
interest_window: Optional; list<int, int>, a list containing the
start and end of interest windows where we will look for change
points. Note that llr will still be calculated using all data
points.
magnitude_quantile: Optional; float; the quantile for magnitude
comparison, if none, will skip the magnitude comparison.
magnitude_ratio: Optional; float; comparable ratio.
magnitude_comparable_day: Optional; float; maximal percentage of
days can have comparable magnitude to be considered as
regression.
return_all_changepoints: Optional; bool; return all the changepoints
found, even the insignificant ones.
Returns:
A list of CUSUMChangePoint.
"""
# Extract all arg values or assign defaults from default vals constant
threshold = _get_arg("threshold", **kwargs)
max_iter = _get_arg("max_iter", **kwargs)
delta_std_ratio = _get_arg("delta_std_ratio", **kwargs)
min_abs_change = _get_arg("min_abs_change", **kwargs)
start_point = _get_arg("start_point", **kwargs)
change_directions = _get_arg("change_directions", **kwargs)
interest_window = _get_arg("interest_window", **kwargs)
magnitude_quantile = _get_arg("magnitude_quantile", **kwargs)
magnitude_ratio = _get_arg("magnitude_ratio", **kwargs)
magnitude_comparable_day = _get_arg("magnitude_comparable_day", **kwargs)
return_all_changepoints = _get_arg("return_all_changepoints", **kwargs)
self.interest_window = interest_window
self.magnitude_quantile = magnitude_quantile
self.magnitude_ratio = magnitude_ratio
# Use array to store the data
ts = self.data.value.to_numpy()
ts = ts.astype("float64")
changes_meta = {}
if change_directions is None:
change_directions = ["increase", "decrease"]
for change_direction in change_directions:
if change_direction not in {"increase", "decrease"}:
raise ValueError(
"Change direction must be 'increase' or 'decrease.' "
f"Got {change_direction}"
)
change_meta = self._get_change_point(
ts,
max_iter=max_iter,
start_point=start_point,
change_direction=change_direction,
)
change_meta["llr"] = self._get_llr(ts, change_meta)
change_meta["p_value"] = 1 - chi2.cdf(change_meta["llr"], 2)
# compare magnitude on interest_window and historical_window
if np.min(ts) >= 0:
if magnitude_quantile and interest_window:
change_ts = ts if change_direction == "increase" else -ts
mag_change = (
self._magnitude_compare(change_ts) >= magnitude_comparable_day
)
else:
mag_change = True
else:
mag_change = True
if magnitude_quantile:
logging.warning(
(
"The minimal value is less than 0. Cannot perform "
"magnitude comparison."
)
)
if_significant = change_meta["llr"] > chi2.ppf(1 - threshold, 2)
if_significant_int = change_meta["llr_int"] > chi2.ppf(1 - threshold, 2)
if change_direction == "increase":
larger_than_min_abs_change = (
change_meta["mu0"] + min_abs_change < change_meta["mu1"]
)
else:
larger_than_min_abs_change = (
change_meta["mu0"] > change_meta["mu1"] + min_abs_change
)
larger_than_std = (
np.abs(change_meta["delta"])
> np.std(ts[: change_meta["changepoint"]]) * delta_std_ratio
)
change_meta["regression_detected"] = (
if_significant
and if_significant_int
and larger_than_min_abs_change
and larger_than_std
and mag_change
)
changes_meta[change_direction] = change_meta
self.changes_meta = changes_meta
return self._convert_cusum_changepoints(changes_meta, return_all_changepoints)