def _get_scenario_forecasts()

in trend_getter/scenarios.py [0:0]


    def _get_scenario_forecasts(self) -> None:
        start_date = self.populations["submission_date"].min()
        end_date = self.dates_to_predict["submission_date"].max()
        self.all_dates = pd.date_range(start=start_date, end=end_date, freq="D")

        filler = pd.concat(
            [
                self.populations.total * np.nan
                for i in range(self.number_of_simulations)
            ],
            axis=1,
        )
        filler.columns = range(self.number_of_simulations)
        print("Running Scenarios: ", end="")
        for population_name, df in self.scaled_historical_forecasts.items():
            self.scenario_forecasts[population_name] = pd.concat(
                [filler, df.copy(deep=True)]
            ).reset_index(drop=True)

        for scenario_name, s in self.scenarios.items():
            print(f"{scenario_name}", end=" | ")
            samples = s.sample(self.number_of_simulations)
            pct_impacts = pd.DataFrame(
                np.column_stack(
                    [
                        np.interp(
                            self.all_dates,
                            pd.to_datetime(
                                [start_date, s.start_date, s.end_date, end_date]
                            ),
                            [0, 0, i, i],
                        )
                        for i in samples
                    ]
                )
            )
            ix = np.argmax(self.all_dates == self.historical_end_date)
            pct_impacts.iloc[:ix] = 0
            pct_impacts.iloc[ix:] = pct_impacts.iloc[ix:] - pct_impacts.iloc[ix].values
            self.scenario_percent_impacts[scenario_name] = pct_impacts

            for population_name in self.population_names:
                if scenario_name in population_name.split("_"):
                    self.scenario_forecasts[population_name] *= (
                        1 - self.scenario_percent_impacts[scenario_name]
                    )

        self.scenario_forecasts["total"] *= 0
        for population_name in self.population_names:
            if population_name != "total":
                self.scenario_forecasts["total"] += self.scenario_forecasts[
                    population_name
                ]

        print("done.")