src/gluonts/dataset/artificial/_base.py [300:643]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        return target

    def generate_ts(
        self, num_ts_steps: int, is_train: bool = False
    ) -> List[DataEntry]:
        res = []
        constant = None
        for i in range(self.num_timeseries):
            if self.is_nan:
                target = self.insert_nans_and_zeros(num_ts_steps)
            elif self.is_piecewise:
                target = self.piecewise_constant(i, num_ts_steps)
            else:
                constant = self.determine_constant(i, constant)
                if self.num_missing_middle > 0:
                    target = self.insert_missing_vals_middle(
                        num_ts_steps, constant
                    )
                elif (
                    self.is_noise
                    or self.is_trend
                    or self.is_promotions
                    or self.holidays
                ):

                    num_steps = self.get_num_steps(i)
                    generated = self.compute_data_from_recipe(
                        num_steps, constant
                    )
                    if is_train:
                        time_series = generated.train
                    else:
                        assert generated.test is not None
                        time_series = generated.test
                    # returns np array convert to list for consistency
                    target = list(time_series)[0][FieldName.TARGET].tolist()
                else:
                    target = [constant] * num_ts_steps
            ts_data = dict(
                start=self.start,
                target=target,
                item_id=str(i),
                feat_static_cat=[i],
                feat_static_real=[i],
            )
            if self.is_promotions or self.holidays:
                ts_data[FieldName.FEAT_DYNAMIC_REAL] = list(time_series)[0][
                    FieldName.FEAT_DYNAMIC_REAL
                ].tolist()
            res.append(ts_data)
        return res

    @property
    def train(self) -> List[DataEntry]:
        return self.generate_ts(
            num_ts_steps=self.num_training_steps, is_train=True
        )

    @property
    def test(self) -> List[DataEntry]:
        return self.generate_ts(num_ts_steps=self.num_steps)


class ComplexSeasonalTimeSeries(ArtificialDataset):
    """
    Generate sinus time series that ramp up and reach a certain amplitude, and
    level and have additional spikes on each sunday.


    TODO: This could be converted to a RecipeDataset to avoid code duplication.
    """

    def __init__(
        self,
        num_series: int = 100,
        prediction_length: int = 20,
        freq_str: str = "D",
        length_low: int = 30,
        length_high: int = 200,
        min_val: float = -10000,
        max_val: float = 10000,
        is_integer: bool = False,
        proportion_missing_values: float = 0,
        is_noise: bool = True,
        is_scale: bool = True,
        percentage_unique_timestamps: float = 0.07,
        is_out_of_bounds_date: bool = False,
        seasonality: Optional[int] = None,
        clip_values: bool = False,
    ) -> None:
        """
        :param num_series: number of time series generated in the train and
               test set
        :param prediction_length:
        :param freq_str:
        :param length_low: minimum length of a time-series, must be larger than
               prediction_length
        :param length_high: maximum length of a time-series
        :param min_val: min value of a time-series
        :param max_val: max value of a time-series
        :param is_integer: whether the dataset has integers or not
        :param proportion_missing_values:
        :param is_noise: whether to add noise
        :param is_scale: whether to add scale
        :param percentage_unique_timestamps: percentage of random start dates bounded between 0 and 1
        :param is_out_of_bounds_date: determines whether to use very old start dates and start dates far in the future
        :param seasonality: Seasonality of the generated data. If not given uses default seasonality for frequency
        :param clip_values: if True the values will be clipped to [min_val, max_val], otherwise linearly scales them
        """
        assert length_low > prediction_length
        super(ComplexSeasonalTimeSeries, self).__init__(freq_str)
        self.num_series = num_series
        self.prediction_length = prediction_length
        self.length_low = length_low
        self.length_high = length_high
        self.freq_str = freq_str
        self.min_val = min_val
        self.max_val = max_val
        self.is_integer = is_integer
        self.proportion_missing_values = proportion_missing_values
        self.is_noise = is_noise
        self.is_scale = is_scale
        self.percentage_unique_timestamps = percentage_unique_timestamps
        self.is_out_of_bounds_date = is_out_of_bounds_date
        self.seasonality = seasonality
        self.clip_values = clip_values

    @property
    def metadata(self) -> MetaData:
        return MetaData(
            freq=self.freq, prediction_length=self.prediction_length
        )

    def _get_period(self) -> int:
        if self.seasonality is not None:
            return self.seasonality
        if self.freq_str == "M":
            return 24
        elif self.freq_str == "W":
            return 52
        elif self.freq_str == "D":
            return 14
        elif self.freq_str == "H":
            return 24
        elif self.freq_str == "min":
            return 60
        else:
            raise RuntimeError()

    def _get_start(self, index: int, my_random: random.Random) -> str:
        if (
            self.is_out_of_bounds_date and index == 0
        ):  # Add edge case of dates out of normal bounds past date
            start_y, start_m, start_d = (
                1690,
                2,
                7,
            )  # Pandas doesn't allot before 1650
            start_h, start_min = 18, 36
        elif (
            self.is_out_of_bounds_date and index == self.num_series - 1
        ):  # Add edge case of dates out of normal bounds future date
            start_y, start_m, start_d = (
                2030,
                6,
                3,
            )  # Pandas doesn't allot before 1650
            start_h, start_min = 18, 36
        # assume that only 100 * percentage_unique_timestamps of timestamps are unique
        elif my_random.random() < self.percentage_unique_timestamps:
            start_y = my_random.randint(2000, 2018)
            start_m = my_random.randint(1, 12)
            start_d = my_random.randint(1, 28)
            start_h = my_random.randint(0, 23)
            start_min = my_random.randint(0, 59)
        else:
            start_y, start_m, start_d = 2013, 11, 28
            start_h, start_min = 18, 36

        if self.freq_str == "M":
            return "%04.d-%02.d" % (start_y, start_m)
        elif self.freq_str in ["W", "D"]:
            return "%04.d-%02.d-%02.d" % (start_y, start_m, start_d)
        elif self.freq_str == "H":
            return "%04.d-%02.d-%02.d %02.d:00:00" % (
                start_y,
                start_m,
                start_d,
                start_h,
            )
        else:
            return "%04.d-%02.d-%02.d %02.d:%02.d:00" % (
                start_y,
                start_m,
                start_d,
                start_h,
                start_min,
            )

    def _special_time_point_indicator(self, index) -> bool:
        if self.freq_str == "M":
            return index.month == 1
        elif self.freq_str == "W":
            return index.month % 2 == 0
        elif self.freq_str == "D":
            return index.dayofweek == 0
        elif self.freq_str == "H":
            return index.hour == 0
        elif self.freq_str == "min":
            return index.minute % 30 == 0
        else:
            raise RuntimeError(f'Bad freq_str value "{index}"')

    @property
    def train(self) -> List[DataEntry]:
        return [
            dict(
                start=ts[FieldName.START],
                target=ts[FieldName.TARGET][: -self.prediction_length],
                item_id=ts[FieldName.ITEM_ID],
            )
            for ts in self.make_timeseries()
        ]

    @property
    def test(self) -> List[DataEntry]:
        return self.make_timeseries()

    def make_timeseries(self, seed: int = 1) -> List[DataEntry]:
        res = []
        # Fix seed so that the training set is the same
        # as the test set from 0:self.prediction_length for the two independent calls

        def sigmoid(x: np.ndarray) -> np.ndarray:
            return 1.0 / (1.0 + np.exp(-x))

        # Ensure same start dates in test and training set
        my_random = random.Random(seed)
        state = np.random.RandomState(seed)
        for i in range(self.num_series):
            val_range = self.max_val - self.min_val
            length = state.randint(low=self.length_low, high=self.length_high)
            start = self._get_start(i, my_random)
            envelope = sigmoid((np.arange(length) - 20.0) / 10.0)
            level = 0.3 * val_range * (state.random_sample() - 0.5)
            phi = 2 * np.pi * state.random_sample()
            period = self._get_period()
            w = 2 * np.pi / period
            t = np.arange(length)
            idx = pd.date_range(
                start=start, freq=self.freq_str, periods=length
            )
            special_tp_indicator = self._special_time_point_indicator(idx)
            sunday_effect = state.random_sample() * special_tp_indicator
            v = np.sin(w * t + phi) + sunday_effect

            if self.is_scale:
                scale = 0.1 * val_range * state.random_sample()
                v *= scale
            v += level
            if self.is_noise:
                noise_range = 0.02 * val_range * state.random_sample()
                noise = noise_range * state.normal(size=length)
                v += noise
            v = envelope * v
            if self.clip_values:
                np.clip(v, a_min=self.min_val, a_max=self.max_val, out=v)
            else:
                """
                Rather than mapping [v_min, v_max] to [self.min_val, self.max_val] which would lead to
                all the time series having the same min and max, we want to keep the same interval length
                (v_max - v_min). We thus shift the interval [v_min, v_max] in [self.min_val, self.max_val]
                and clip it if needed.
                """
                v_min, v_max = v.min(), v.max()
                p_min, p_max = (
                    max(self.min_val, v_min),
                    min(self.max_val, v_max),
                )
                shifted_min = np.clip(
                    p_min + (p_max - v_max),
                    a_min=self.min_val,
                    a_max=self.max_val,
                )
                shifted_max = np.clip(
                    p_max + (p_min - v_min),
                    a_min=self.min_val,
                    a_max=self.max_val,
                )
                v = shifted_min + (shifted_max - shifted_min) * (v - v_min) / (
                    v_max - v_min
                )

            if self.is_integer:
                np.clip(
                    v,
                    a_min=np.ceil(self.min_val),
                    a_max=np.floor(self.max_val),
                    out=v,
                )
                v = np.round(v).astype(int)
            v = list(v.tolist())
            if self.proportion_missing_values > 0:
                assert (
                    self.proportion_missing_values < 1.0
                ), "Please chose a number 0 < x < 1.0"
                idx = np.arange(len(v))
                state.shuffle(idx)
                num_missing_values = (
                    int(len(v) * self.proportion_missing_values) + 1
                )  # Add one in case this gets zero
                missing_idx = idx[:num_missing_values]
                for j in missing_idx:
                    # Using convention that there are no missing values before the start date.
                    if j != 0:
                        v[j] = None if state.rand() < 0.5 else "NaN"
            res.append(
                dict(
                    start=pd.Timestamp(start, freq=self.freq_str),
                    target=np.array(v),
                    item_id=i,
                )
            )
        return res


class RecipeDataset(ArtificialDataset):
    """Synthetic data set generated by providing a recipe.

    A recipe is either a (non-deterministic) function

        f(length: int, global_state: dict) -> dict

    or list of (field, function) tuples of the form

        (field: str, f(data: dict, length: int, global_state: dict) -> dict)

    which is processed sequentially, with data initially set to {},
    and each entry updating data[field] to the output of the function
    call.
    """

    def __init__(
        self,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/gluonts/nursery/SCott/pts/dataset/artificial.py [298:641]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        return target

    def generate_ts(
        self, num_ts_steps: int, is_train: bool = False
    ) -> List[DataEntry]:
        res = []
        constant = None
        for i in range(self.num_timeseries):
            if self.is_nan:
                target = self.insert_nans_and_zeros(num_ts_steps)
            elif self.is_piecewise:
                target = self.piecewise_constant(i, num_ts_steps)
            else:
                constant = self.determine_constant(i, constant)
                if self.num_missing_middle > 0:
                    target = self.insert_missing_vals_middle(
                        num_ts_steps, constant
                    )
                elif (
                    self.is_noise
                    or self.is_trend
                    or self.is_promotions
                    or self.holidays
                ):

                    num_steps = self.get_num_steps(i)
                    generated = self.compute_data_from_recipe(
                        num_steps, constant
                    )
                    if is_train:
                        time_series = generated.train
                    else:
                        assert generated.test is not None
                        time_series = generated.test
                    # returns np array convert to list for consistency
                    target = list(time_series)[0][FieldName.TARGET].tolist()
                else:
                    target = [constant] * num_ts_steps
            ts_data = dict(
                start=self.start,
                target=target,
                item_id=str(i),
                feat_static_cat=[i],
                feat_static_real=[i],
            )
            if self.is_promotions or self.holidays:
                ts_data[FieldName.FEAT_DYNAMIC_REAL] = list(time_series)[0][
                    FieldName.FEAT_DYNAMIC_REAL
                ].tolist()
            res.append(ts_data)
        return res

    @property
    def train(self) -> List[DataEntry]:
        return self.generate_ts(
            num_ts_steps=self.num_training_steps, is_train=True
        )

    @property
    def test(self) -> List[DataEntry]:
        return self.generate_ts(num_ts_steps=self.num_steps)


class ComplexSeasonalTimeSeries(ArtificialDataset):
    """
    Generate sinus time series that ramp up and reach a certain amplitude, and
    level and have additional spikes on each sunday.


    TODO: This could be converted to a RecipeDataset to avoid code duplication.
    """

    def __init__(
        self,
        num_series: int = 100,
        prediction_length: int = 20,
        freq_str: str = "D",
        length_low: int = 30,
        length_high: int = 200,
        min_val: float = -10000,
        max_val: float = 10000,
        is_integer: bool = False,
        proportion_missing_values: float = 0,
        is_noise: bool = True,
        is_scale: bool = True,
        percentage_unique_timestamps: float = 0.07,
        is_out_of_bounds_date: bool = False,
        seasonality: Optional[int] = None,
        clip_values: bool = False,
    ) -> None:
        """
        :param num_series: number of time series generated in the train and
               test set
        :param prediction_length:
        :param freq_str:
        :param length_low: minimum length of a time-series, must be larger than
               prediction_length
        :param length_high: maximum length of a time-series
        :param min_val: min value of a time-series
        :param max_val: max value of a time-series
        :param is_integer: whether the dataset has integers or not
        :param proportion_missing_values:
        :param is_noise: whether to add noise
        :param is_scale: whether to add scale
        :param percentage_unique_timestamps: percentage of random start dates bounded between 0 and 1
        :param is_out_of_bounds_date: determines whether to use very old start dates and start dates far in the future
        :param seasonality: Seasonality of the generated data. If not given uses default seasonality for frequency
        :param clip_values: if True the values will be clipped to [min_val, max_val], otherwise linearly scales them
        """
        assert length_low > prediction_length
        super(ComplexSeasonalTimeSeries, self).__init__(freq_str)
        self.num_series = num_series
        self.prediction_length = prediction_length
        self.length_low = length_low
        self.length_high = length_high
        self.freq_str = freq_str
        self.min_val = min_val
        self.max_val = max_val
        self.is_integer = is_integer
        self.proportion_missing_values = proportion_missing_values
        self.is_noise = is_noise
        self.is_scale = is_scale
        self.percentage_unique_timestamps = percentage_unique_timestamps
        self.is_out_of_bounds_date = is_out_of_bounds_date
        self.seasonality = seasonality
        self.clip_values = clip_values

    @property
    def metadata(self) -> MetaData:
        return MetaData(
            freq=self.freq, prediction_length=self.prediction_length
        )

    def _get_period(self) -> int:
        if self.seasonality is not None:
            return self.seasonality
        if self.freq_str == "M":
            return 24
        elif self.freq_str == "W":
            return 52
        elif self.freq_str == "D":
            return 14
        elif self.freq_str == "H":
            return 24
        elif self.freq_str == "min":
            return 60
        else:
            raise RuntimeError()

    def _get_start(self, index: int, my_random: random.Random) -> str:
        if (
            self.is_out_of_bounds_date and index == 0
        ):  # Add edge case of dates out of normal bounds past date
            start_y, start_m, start_d = (
                1690,
                2,
                7,
            )  # Pandas doesn't allot before 1650
            start_h, start_min = 18, 36
        elif (
            self.is_out_of_bounds_date and index == self.num_series - 1
        ):  # Add edge case of dates out of normal bounds future date
            start_y, start_m, start_d = (
                2030,
                6,
                3,
            )  # Pandas doesn't allot before 1650
            start_h, start_min = 18, 36
        # assume that only 100 * percentage_unique_timestamps of timestamps are unique
        elif my_random.random() < self.percentage_unique_timestamps:
            start_y = my_random.randint(2000, 2018)
            start_m = my_random.randint(1, 12)
            start_d = my_random.randint(1, 28)
            start_h = my_random.randint(0, 23)
            start_min = my_random.randint(0, 59)
        else:
            start_y, start_m, start_d = 2013, 11, 28
            start_h, start_min = 18, 36

        if self.freq_str == "M":
            return "%04.d-%02.d" % (start_y, start_m)
        elif self.freq_str in ["W", "D"]:
            return "%04.d-%02.d-%02.d" % (start_y, start_m, start_d)
        elif self.freq_str == "H":
            return "%04.d-%02.d-%02.d %02.d:00:00" % (
                start_y,
                start_m,
                start_d,
                start_h,
            )
        else:
            return "%04.d-%02.d-%02.d %02.d:%02.d:00" % (
                start_y,
                start_m,
                start_d,
                start_h,
                start_min,
            )

    def _special_time_point_indicator(self, index) -> bool:
        if self.freq_str == "M":
            return index.month == 1
        elif self.freq_str == "W":
            return index.month % 2 == 0
        elif self.freq_str == "D":
            return index.dayofweek == 0
        elif self.freq_str == "H":
            return index.hour == 0
        elif self.freq_str == "min":
            return index.minute % 30 == 0
        else:
            raise RuntimeError(f'Bad freq_str value "{index}"')

    @property
    def train(self) -> List[DataEntry]:
        return [
            dict(
                start=ts[FieldName.START],
                target=ts[FieldName.TARGET][: -self.prediction_length],
                item_id=ts[FieldName.ITEM_ID],
            )
            for ts in self.make_timeseries()
        ]

    @property
    def test(self) -> List[DataEntry]:
        return self.make_timeseries()

    def make_timeseries(self, seed: int = 1) -> List[DataEntry]:
        res = []
        # Fix seed so that the training set is the same
        # as the test set from 0:self.prediction_length for the two independent calls

        def sigmoid(x: np.ndarray) -> np.ndarray:
            return 1.0 / (1.0 + np.exp(-x))

        # Ensure same start dates in test and training set
        my_random = random.Random(seed)
        state = np.random.RandomState(seed)
        for i in range(self.num_series):
            val_range = self.max_val - self.min_val
            length = state.randint(low=self.length_low, high=self.length_high)
            start = self._get_start(i, my_random)
            envelope = sigmoid((np.arange(length) - 20.0) / 10.0)
            level = 0.3 * val_range * (state.random_sample() - 0.5)
            phi = 2 * np.pi * state.random_sample()
            period = self._get_period()
            w = 2 * np.pi / period
            t = np.arange(length)
            idx = pd.date_range(
                start=start, freq=self.freq_str, periods=length
            )
            special_tp_indicator = self._special_time_point_indicator(idx)
            sunday_effect = state.random_sample() * special_tp_indicator
            v = np.sin(w * t + phi) + sunday_effect

            if self.is_scale:
                scale = 0.1 * val_range * state.random_sample()
                v *= scale
            v += level
            if self.is_noise:
                noise_range = 0.02 * val_range * state.random_sample()
                noise = noise_range * state.normal(size=length)
                v += noise
            v = envelope * v
            if self.clip_values:
                np.clip(v, a_min=self.min_val, a_max=self.max_val, out=v)
            else:
                """
                Rather than mapping [v_min, v_max] to [self.min_val, self.max_val] which would lead to
                all the time series having the same min and max, we want to keep the same interval length
                (v_max - v_min). We thus shift the interval [v_min, v_max] in [self.min_val, self.max_val]
                and clip it if needed.
                """
                v_min, v_max = v.min(), v.max()
                p_min, p_max = (
                    max(self.min_val, v_min),
                    min(self.max_val, v_max),
                )
                shifted_min = np.clip(
                    p_min + (p_max - v_max),
                    a_min=self.min_val,
                    a_max=self.max_val,
                )
                shifted_max = np.clip(
                    p_max + (p_min - v_min),
                    a_min=self.min_val,
                    a_max=self.max_val,
                )
                v = shifted_min + (shifted_max - shifted_min) * (v - v_min) / (
                    v_max - v_min
                )

            if self.is_integer:
                np.clip(
                    v,
                    a_min=np.ceil(self.min_val),
                    a_max=np.floor(self.max_val),
                    out=v,
                )
                v = np.round(v).astype(int)
            v = list(v.tolist())
            if self.proportion_missing_values > 0:
                assert (
                    self.proportion_missing_values < 1.0
                ), "Please chose a number 0 < x < 1.0"
                idx = np.arange(len(v))
                state.shuffle(idx)
                num_missing_values = (
                    int(len(v) * self.proportion_missing_values) + 1
                )  # Add one in case this gets zero
                missing_idx = idx[:num_missing_values]
                for j in missing_idx:
                    # Using convention that there are no missing values before the start date.
                    if j != 0:
                        v[j] = None if state.rand() < 0.5 else "NaN"
            res.append(
                dict(
                    start=pd.Timestamp(start, freq=self.freq_str),
                    target=np.array(v),
                    item_id=i,
                )
            )
        return res


class RecipeDataset(ArtificialDataset):
    """Synthetic data set generated by providing a recipe.

    A recipe is either a (non-deterministic) function

        f(length: int, global_state: dict) -> dict

    or list of (field, function) tuples of the form

        (field: str, f(data: dict, length: int, global_state: dict) -> dict)

    which is processed sequentially, with data initially set to {},
    and each entry updating data[field] to the output of the function
    call.
    """

    def __init__(
        self,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



