in trend_getter/scenarios.py [0:0]
def fetch_data(self, filtered_end_date: str = None) -> None:
if self.raw_df is None:
query = self._query_()
print(f"Fetching Data:\n\n{query}")
self.raw_df = (
bigquery.Client(project=self.project).query(query).to_dataframe()
)
df = self.raw_df.copy(deep=True)
else:
if filtered_end_date is not None:
self.historical_end_date = filtered_end_date
print(f"Truncating data to before:\n\n{self.historical_end_date}")
df = self.raw_df[
pd.to_datetime(self.raw_df.submission_date)
<= pd.to_datetime(self.historical_end_date)
].copy(deep=True)
self.dates_to_predict = pd.DataFrame(
{
"submission_date": pd.date_range(
self.historical_end_date, self.forecast_end_date
).date[1:]
}
)
cols = list(set(df.columns) - {"submission_date", "country", "dau"})
df["population"] = (
df[cols]
.apply(lambda row: "_".join(col for col in cols if row[col]), axis=1)
.replace("", "other") # Replace empty strings with "other"
)
# Pivot to wide format
df = (
df.pivot_table(
index=["submission_date", "country"],
columns="population",
values="dau",
aggfunc="sum",
fill_value=0,
)
.reset_index()
.rename_axis(columns=None)
.replace({0: np.nan})
)
df["total"] = df.drop(columns=["submission_date", "country"]).sum(axis=1)
self.historical_dates = df["submission_date"]
self.population_names = (
["total"]
+ sorted(set(df.columns) - {"total", "other", "submission_date", "country"})
+ ["other"]
)
sub_populations = []
for pop in self.population_names:
a = (
df.groupby(["submission_date", "country"], as_index=False)[pop]
.sum(min_count=1)
.dropna()
)
a["dau"] = a[pop]
hi = holidays.HolidayImpacts(
df=a,
forecast_start=self.dates_to_predict["submission_date"].min(),
forecast_end=self.dates_to_predict["submission_date"].max(),
# detrend_spike_correction=8.0,
)
hi.fit()
b = hi.all_countries
sub_populations.append(
b.rename(columns={"dau": f"{pop}_original", "expected": pop})[
["submission_date", pop, f"{pop}_original"]
]
)
if pop == "total":
self.holiday_impacts = hi
self.future_holiday_impacts = hi.predict()
self.populations = (
reduce(
lambda left, right: pd.merge(
left, right, on="submission_date", how="outer"
),
sub_populations,
)
.sort_values("submission_date")
.reset_index(drop=True)
)