def run_pipeline()

in afa/core.py [0:0]


def run_pipeline(data, freq_in, freq_out, metric="smape",
    cv_stride=1, backend="futures", tqdm_obj=None, horiz=None):
    """Run model selection over *all* timeseries in a dataframe. Note that
    this is a generator and will yield one results dataframe per timeseries at
    a single iteration.

    Parameters
    ----------
    data : str
    horiz : int
    freq_in : str
    freq_out : str
    obj_metric : str, optional
    cv_stride : int
        Stride length of cross-validation sliding window. For example, a
        `cv_stride=2` for `freq_out='W'` means the cross-validation will be
        performed every 2 weeks in the training data. Smaller values will lead
        to longer model selection durations.
    backend : str, optional
        "multiprocessing", "pyspark", or "lambdamap"

    """
    
    if horiz is None:
        df_horiz = data[GROUP_COLS + ["horiz"]].drop_duplicates()

    df = load_data(data, freq_in)

    # resample the input dataset to the desired frequency.
    df = resample(df, freq_out)
    
    if horiz is None:
        df["timestamp"] = df.index
        df = df.merge(df_horiz, on=GROUP_COLS, how="left")
        df.set_index("timestamp", inplace=True)
        group_cols = GROUP_COLS + ["horiz"]
    else:
        group_cols = GROUP_COLS
    
    groups = df.groupby(group_cols, as_index=False, sort=False)
    job_count = groups.ngroups

    if backend == "python": 
        results = \
            [run_cv_select(dd, horiz, freq_out, metric, cv_stride)
                 for _, dd in groups]
    elif backend == "futures":
        ex = futures.ProcessPoolExecutor()

        wait_for = [
            ex.submit(run_cv_select,
                *(dd, horiz, freq_out, metric, cv_stride))
                for _, dd in groups
        ]

        # return the list of futures
        results = wait_for
    elif backend == "pyspark":
        raise NotImplementedError
    elif backend == "lambdamap":
        raise NotImplementedError
    else:
        raise NotImplementedError

    return results