in afa/core.py [0:0]
def run_pipeline(data, freq_in, freq_out, metric="smape",
cv_stride=1, backend="futures", tqdm_obj=None, horiz=None):
"""Run model selection over *all* timeseries in a dataframe. Note that
this is a generator and will yield one results dataframe per timeseries at
a single iteration.
Parameters
----------
data : str
horiz : int
freq_in : str
freq_out : str
obj_metric : str, optional
cv_stride : int
Stride length of cross-validation sliding window. For example, a
`cv_stride=2` for `freq_out='W'` means the cross-validation will be
performed every 2 weeks in the training data. Smaller values will lead
to longer model selection durations.
backend : str, optional
"multiprocessing", "pyspark", or "lambdamap"
"""
if horiz is None:
df_horiz = data[GROUP_COLS + ["horiz"]].drop_duplicates()
df = load_data(data, freq_in)
# resample the input dataset to the desired frequency.
df = resample(df, freq_out)
if horiz is None:
df["timestamp"] = df.index
df = df.merge(df_horiz, on=GROUP_COLS, how="left")
df.set_index("timestamp", inplace=True)
group_cols = GROUP_COLS + ["horiz"]
else:
group_cols = GROUP_COLS
groups = df.groupby(group_cols, as_index=False, sort=False)
job_count = groups.ngroups
if backend == "python":
results = \
[run_cv_select(dd, horiz, freq_out, metric, cv_stride)
for _, dd in groups]
elif backend == "futures":
ex = futures.ProcessPoolExecutor()
wait_for = [
ex.submit(run_cv_select,
*(dd, horiz, freq_out, metric, cv_stride))
for _, dd in groups
]
# return the list of futures
results = wait_for
elif backend == "pyspark":
raise NotImplementedError
elif backend == "lambdamap":
raise NotImplementedError
else:
raise NotImplementedError
return results