def split()

in otava/analysis.py [0:0]


def split(series: np.array, window_len: int = 30, max_pvalue: float = 0.001,
          new_points=None, old_cp=None) -> List[ChangePoint]:
    """
    Finds change points by splitting the series top-down.

    Internally it uses the EDivisive algorithm from mongodb-signal-processing
    that recursively splits the series in a way to maximize some measure of
    dissimilarity (denoted qhat) between the chunks.
    Splitting happens as long as the dissimilarity is statistically significant.

    Unfortunately this algorithms has a few downsides:
    - the complexity is O(n^2), where n is the length of the series
    - if there are too many change points and too much data, the change points in the middle
      of the series may be missed

    This function tries to address these issues by invoking EDivisive on smaller
    chunks (windows) of the input data instead of the full series and then merging the results.
    Each window should be large enough to contain enough points to detect a change-point.
    Consecutive windows overlap so that we won't miss changes happening between them.
    """
    assert "Window length must be at least 2", window_len >= 2
    start = 0
    step = int(window_len / 2)
    indexes = []
    # N new_points are appended to the end of series. Typically N=1.
    # old_cp are the weak change points from before new points were added.
    # We now just compute e-e_divisive for the tail of the series, beginning at
    # max(old_cp[-1], a step that is over 2 window_len from the end)
    if new_points is not None and old_cp is not None:
        indexes = [c.index for c in old_cp]
        steps_needed = new_points/window_len + 4
        max_start = len(series) - steps_needed*window_len
        for c in old_cp:
            if c.index < max_start:
                start = c.index
        for s in range(0, len(series), step):
            if s < max_start and start < s:
                start = s

    tester = TTestSignificanceTester(max_pvalue)
    while start < len(series):
        end = min(start + window_len, len(series))
        calculator = cext_calculator

        algo = EDivisive(seed=None, calculator=calculator, significance_tester=tester)
        pts = algo.get_change_points(series[start:end])
        new_indexes = [p.index + start for p in pts]
        new_indexes.sort()
        last_new_change_point_index = next(iter(new_indexes[-1:]), 0)
        start = max(last_new_change_point_index, start + step)
        # incremental Otava can duplicate an old cp
        for i in new_indexes:
            if i not in indexes:
                indexes += [i]

    window_endpoints = [0] + indexes + [len(series)]
    return [tester.change_point(i, series, window_endpoints) for i in indexes]