in tensorflow_estimator/python/estimator/canned/timeseries/ar_model.py [0:0]
def get_batch_loss(self, features, mode, state):
"""Computes predictions and a loss.
Args:
features: A dictionary (such as is produced by a chunker) with the
following key/value pairs (shapes are given as required for training):
TrainEvalFeatures.TIMES: A [batch size, self.window_size] integer
Tensor with times for each observation. To train on longer
sequences, the data should first be chunked.
TrainEvalFeatures.VALUES: A [batch size, self.window_size,
self.num_features] Tensor with values for each observation. When
evaluating, `TIMES` and `VALUES` must have a window size of at least
self.window_size, but it may be longer, in which case the last
window_size - self.input_window_size times (or fewer if this is not
divisible by self.output_window_size) will be evaluated on with
non-overlapping output windows (and will have associated
predictions). This is primarily to support qualitative
evaluation/plotting, and is not a recommended way to compute
evaluation losses (since there is no overlap in the output windows,
which for window-based models is an undesirable bias).
mode: The tf.estimator.ModeKeys mode to use (TRAIN or EVAL).
state: Unused
Returns:
A model.ModelOutputs object.
Raises:
ValueError: If `mode` is not TRAIN or EVAL, or if static shape information
is incorrect.
"""
features = {
feature_name: ops.convert_to_tensor(feature_value)
for feature_name, feature_value in features.items()
}
times = features[TrainEvalFeatures.TIMES]
exogenous_regressors = self._process_exogenous_features(
times=times,
features={
key: value for key, value in features.items() if key not in [
TrainEvalFeatures.TIMES, TrainEvalFeatures.VALUES,
PredictionFeatures.STATE_TUPLE
]
})
if mode == estimator_lib.ModeKeys.TRAIN:
# For training, we require the window size to be self.window_size as
# iterating sequentially on larger windows could introduce a bias.
return self._process_window(
features, mode=mode, exogenous_regressors=exogenous_regressors)
elif mode == estimator_lib.ModeKeys.EVAL:
# For evaluation, we allow the user to pass in a larger window, in which
# case we try to cover as much of the window as possible without
# overlap. Quantitative evaluation is more efficient/correct with fixed
# windows matching self.window_size (as with training), but this looping
# allows easy plotting of "in-sample" predictions.
times.get_shape().assert_has_rank(2)
static_window_size = times.get_shape().dims[1].value
if (static_window_size is not None and
static_window_size < self.window_size):
raise ValueError(
("ARModel requires a window of at least input_window_size + "
"output_window_size to evaluate on (input_window_size={}, "
"output_window_size={}, and got shape {} for feature '{}' (batch "
"size, window size)).").format(self.input_window_size,
self.output_window_size,
times.get_shape(),
TrainEvalFeatures.TIMES))
num_iterations = (
(tf.compat.v1.shape(times)[1] - self.input_window_size) //
self.output_window_size)
output_size = num_iterations * self.output_window_size
# Rather than dealing with overlapping windows of output, discard a bit at
# the beginning if output windows don't cover evenly.
crop_length = output_size + self.input_window_size
features = {
feature_name: feature_value[:, -crop_length:]
for feature_name, feature_value in features.items()
}
# Note that, unlike the ARModel's predict() while_loop, each iteration
# here can run in parallel, since we are not feeding predictions or state
# from previous iterations.
def _while_condition(iteration_number, loss_ta, mean_ta, covariance_ta):
del loss_ta, mean_ta, covariance_ta # unused
return iteration_number < num_iterations
def _while_body(iteration_number, loss_ta, mean_ta, covariance_ta):
"""Perform a processing step on a single window of data."""
base_offset = iteration_number * self.output_window_size
model_outputs = self._process_window(
features={
feature_name:
feature_value[:, base_offset:base_offset + self.window_size]
for feature_name, feature_value in features.items()
},
mode=mode,
exogenous_regressors=exogenous_regressors[:,
base_offset:base_offset +
self.window_size])
# This code needs to be updated if new predictions are added in
# self._process_window
assert len(model_outputs.predictions) == 3
assert "mean" in model_outputs.predictions
assert "covariance" in model_outputs.predictions
assert "observed" in model_outputs.predictions
return (iteration_number + 1,
loss_ta.write(iteration_number, model_outputs.loss),
mean_ta.write(iteration_number,
model_outputs.predictions["mean"]),
covariance_ta.write(iteration_number,
model_outputs.predictions["covariance"]))
_, loss_ta, mean_ta, covariance_ta = tf.compat.v1.while_loop(
_while_condition, _while_body, [
0,
tf.TensorArray(dtype=self.dtype, size=num_iterations),
tf.TensorArray(dtype=self.dtype, size=num_iterations),
tf.TensorArray(dtype=self.dtype, size=num_iterations)
])
values = tf.cast(features[TrainEvalFeatures.VALUES], dtype=self.dtype)
batch_size = tf.compat.v1.shape(times)[0]
prediction_shape = [
batch_size, self.output_window_size * num_iterations,
self.num_features
]
(previous_state_times, previous_state_values,
previous_state_exogenous_regressors) = state
# Make sure returned state always has windows of self.input_window_size,
# even if we were passed fewer than self.input_window_size points this
# time.
if self.input_window_size > 0:
new_state_times = tf.concat(
[previous_state_times,
tf.cast(times, dtype=tf.dtypes.int64)],
axis=1)[:, -self.input_window_size:]
new_state_times.set_shape((None, self.input_window_size))
new_state_values = tf.concat(
[previous_state_values,
self._scale_data(values)], axis=1)[:, -self.input_window_size:, :]
new_state_values.set_shape(
(None, self.input_window_size, self.num_features))
new_exogenous_regressors = tf.concat(
[previous_state_exogenous_regressors, exogenous_regressors],
axis=1)[:, -self.input_window_size:, :]
new_exogenous_regressors.set_shape(
(None, self.input_window_size, self.exogenous_size))
else:
# There is no state to keep, and the strided slices above do not handle
# input_window_size=0.
new_state_times = previous_state_times
new_state_values = previous_state_values
new_exogenous_regressors = previous_state_exogenous_regressors
return model.ModelOutputs(
loss=tf.math.reduce_mean(loss_ta.stack(), axis=0),
end_state=(new_state_times, new_state_values,
new_exogenous_regressors),
predictions={
"mean":
tf.reshape(
tf.compat.v1.transpose(mean_ta.stack(), [1, 0, 2, 3]),
prediction_shape),
"covariance":
tf.reshape(
tf.compat.v1.transpose(covariance_ta.stack(),
[1, 0, 2, 3]), prediction_shape),
"observed":
values[:, -output_size:]
},
prediction_times=times[:, -output_size:])
else:
raise ValueError(
"Unknown mode '{}' passed to get_batch_loss.".format(mode))