in src/gluonts/nursery/SCott/pts/dataset/stat.py [0:0]
def calculate_dataset_statistics(ts_dataset: Any) -> DatasetStatistics:
"""
Computes the statistics of a given Dataset.
Parameters
----------
ts_dataset
Dataset of which to compute the statistics.
Returns
-------
DatasetStatistics
NamedTuple containing the statistics.
"""
num_time_observations = 0
num_time_series = 0
min_target = 1e20
max_target = -1e20
sum_target = 0.0
sum_abs_target = 0.0
integer_dataset = True
observed_feat_static_cat: Optional[List[Set[int]]] = None
observed_feat_static_real: Optional[List[Set[float]]] = None
num_feat_static_real: Optional[int] = None
num_feat_static_cat: Optional[int] = None
num_feat_dynamic_real: Optional[int] = None
num_feat_dynamic_cat: Optional[int] = None
num_missing_values = 0
scale_histogram = ScaleHistogram()
with tqdm(enumerate(ts_dataset, start=1), total=len(ts_dataset)) as it:
for num_time_series, ts in it:
# TARGET
target = ts[FieldName.TARGET]
observed_target = target[~np.isnan(target)]
num_observations = len(observed_target)
if num_observations > 0:
# 'nan' is handled in observed_target definition
assert_pts(
np.all(np.isfinite(observed_target)),
"Target values have to be finite (e.g., not inf, -inf, "
"or None) and cannot exceed single precision floating "
"point range.",
)
num_time_observations += num_observations
min_target = float(min(min_target, observed_target.min()))
max_target = float(max(max_target, observed_target.max()))
num_missing_values += int(np.isnan(target).sum())
sum_target += float(observed_target.sum())
sum_abs_target += float(np.abs(observed_target).sum())
integer_dataset = integer_dataset and bool(
np.all(np.mod(observed_target, 1) == 0)
)
scale_histogram.add(
observed_target
) # after checks for inf and None
# FEAT_STATIC_CAT
feat_static_cat = (
ts[FieldName.FEAT_STATIC_CAT]
if FieldName.FEAT_STATIC_CAT in ts
else []
)
if num_feat_static_cat is None:
num_feat_static_cat = len(feat_static_cat)
observed_feat_static_cat = [
set() for _ in range(num_feat_static_cat)
]
# needed to type check
assert num_feat_static_cat is not None
assert observed_feat_static_cat is not None
assert_pts(
num_feat_static_cat == len(feat_static_cat),
"Not all feat_static_cat vectors have the same length {} != {}.",
num_feat_static_cat,
len(feat_static_cat),
)
for i, c in enumerate(feat_static_cat):
observed_feat_static_cat[i].add(c)
# FEAT_STATIC_REAL
feat_static_real = (
ts[FieldName.FEAT_STATIC_REAL]
if FieldName.FEAT_STATIC_REAL in ts
else []
)
if num_feat_static_real is None:
num_feat_static_real = len(feat_static_real)
observed_feat_static_real = [
set() for _ in range(num_feat_static_real)
]
# needed to type check
assert num_feat_static_real is not None
assert observed_feat_static_real is not None
assert_pts(
num_feat_static_real == len(feat_static_real),
"Not all feat_static_real vectors have the same length {} != {}.",
num_feat_static_real,
len(feat_static_real),
)
for i, c in enumerate(feat_static_real):
observed_feat_static_real[i].add(c)
# FEAT_DYNAMIC_CAT
feat_dynamic_cat = (
ts[FieldName.FEAT_DYNAMIC_CAT]
if FieldName.FEAT_DYNAMIC_CAT in ts
else None
)
if feat_dynamic_cat is None:
# feat_dynamic_cat not found, check it was the first ts we encounter or
# that feat_dynamic_cat were seen before
assert_pts(
num_feat_dynamic_cat is None or num_feat_dynamic_cat == 0,
"feat_dynamic_cat was found for some instances but not others.",
)
num_feat_dynamic_cat = 0
else:
if num_feat_dynamic_cat is None:
# first num_feat_dynamic_cat found
num_feat_dynamic_cat = feat_dynamic_cat.shape[0]
else:
assert_pts(
num_feat_dynamic_cat == feat_dynamic_cat.shape[0],
"Found instances with different number of features in "
"feat_dynamic_cat, found one with {} and another with {}.",
num_feat_dynamic_cat,
feat_dynamic_cat.shape[0],
)
assert_pts(
np.all(np.isfinite(feat_dynamic_cat)),
"Features values have to be finite and cannot exceed single "
"precision floating point range.",
)
num_feat_dynamic_cat_time_steps = feat_dynamic_cat.shape[1]
assert_pts(
num_feat_dynamic_cat_time_steps == len(target),
"Each feature in feat_dynamic_cat has to have the same length as "
"the target. Found an instance with feat_dynamic_cat of length {} "
"and a target of length {}.",
num_feat_dynamic_cat_time_steps,
len(target),
)
# FEAT_DYNAMIC_REAL
feat_dynamic_real = (
ts[FieldName.FEAT_DYNAMIC_REAL]
if FieldName.FEAT_DYNAMIC_REAL in ts
else None
)
if feat_dynamic_real is None:
# feat_dynamic_real not found, check it was the first ts we encounter or
# that feat_dynamic_real were seen before
assert_pts(
num_feat_dynamic_real is None
or num_feat_dynamic_real == 0,
"feat_dynamic_real was found for some instances but not others.",
)
num_feat_dynamic_real = 0
else:
if num_feat_dynamic_real is None:
# first num_feat_dynamic_real found
num_feat_dynamic_real = feat_dynamic_real.shape[0]
else:
assert_pts(
num_feat_dynamic_real == feat_dynamic_real.shape[0],
"Found instances with different number of features in "
"feat_dynamic_real, found one with {} and another with {}.",
num_feat_dynamic_real,
feat_dynamic_real.shape[0],
)
assert_pts(
np.all(np.isfinite(feat_dynamic_real)),
"Features values have to be finite and cannot exceed single "
"precision floating point range.",
)
num_feat_dynamic_real_time_steps = feat_dynamic_real.shape[1]
assert_pts(
num_feat_dynamic_real_time_steps == len(target),
"Each feature in feat_dynamic_real has to have the same length as "
"the target. Found an instance with feat_dynamic_real of length {} "
"and a target of length {}.",
num_feat_dynamic_real_time_steps,
len(target),
)
assert_pts(num_time_series > 0, "Time series dataset is empty!")
assert_pts(
num_time_observations > 0,
"Only empty time series found in the dataset!",
)
# note this require the above assumption to avoid a division by zero
# runtime error
mean_target_length = num_time_observations / num_time_series
# note this require the above assumption to avoid a division by zero
# runtime error
mean_target = sum_target / num_time_observations
mean_abs_target = sum_abs_target / num_time_observations
integer_dataset = integer_dataset and min_target >= 0.0
assert len(scale_histogram) == num_time_series
return DatasetStatistics(
integer_dataset=integer_dataset,
max_target=max_target,
mean_abs_target=mean_abs_target,
mean_target=mean_target,
mean_target_length=mean_target_length,
min_target=min_target,
num_missing_values=num_missing_values,
feat_static_real=observed_feat_static_real
if observed_feat_static_real
else [],
feat_static_cat=observed_feat_static_cat
if observed_feat_static_cat
else [],
num_feat_dynamic_real=num_feat_dynamic_real,
num_feat_dynamic_cat=num_feat_dynamic_cat,
num_time_observations=num_time_observations,
num_time_series=num_time_series,
scale_histogram=scale_histogram,
)