in neural/linear/lin_model_template.py [0:0]
def predict(self, U=None, Y=None, start=0, eval="unrolled"):
if self.log:
print("\n-----------------------------------------")
print("\n PREDICT \n")
print("-----------------------------------------\n")
# instantiate U and Y
n_epochs, n_times, self.n_channels_u = U.shape
if Y is None:
Y = np.zeros((n_epochs, n_times, self.n_channels_y))
if U is None:
U = np.zeros((n_epochs, n_times, self.n_channels_u))
# augment U and Y_pred with generic initializations for prediction
U_ini = np.zeros((n_epochs, self.lag_u, self.n_channels_u))
Y_ini = np.zeros((n_epochs, self.lag_y, self.n_channels_y))
U_augmented = np.concatenate([U_ini, U], axis=1)
Y_augmented = np.concatenate([Y_ini, Y], axis=1)
if self.log:
print(U_augmented.shape)
print(Y_augmented.shape)
# convert: canonical space timeseries -> state space timeseries
V = statespace_transform(U_augmented, self.lag_u)
X = statespace_transform(Y_augmented, self.lag_y)
_, _, n_feats_v = V.shape
_, _, n_feats_x = X.shape
# make sure n_times + 1
V = V[:, -(n_times + 1):, :]
X = X[:, -(n_times + 1):, :]
# standard scale
# V_augmented = self.scaler_v.transform(V_augmented.reshape(-1, n_feats_v))
# V_augmented = V_augmented.reshape(n_epochs, n_times + 1, n_feats_v)
if self.log:
print("\n Constructing predicted trajectories... \n")
# initialize pred
Y_pred = list()
if start > 0:
for idx in range(start):
Y_pred.append(Y[:, idx, :])
if eval == "onestep":
X_true = copy.deepcopy(X)
# THIS IS NEW
# X = np.zeros(n_epochs, n_times, self.weights.shape[0])
# print("DIM OF X_TRUE IS: ", X_true.shape)
for t in range(start, n_times):
# if self.scaling:
# V_contrib = np.array([(self.scaler_v.transform(V_augmented[epoch, t+1, :][None, :])).flatten()
# for epoch in range(n_epochs)])
# X_contrib = np.array([(self.scaler_x.transform(X_pred_augmented[epoch, t, :][None, :])).flatten()
# for epoch in range(n_epochs)])
V_contrib = V[:, t + 1, :]
if eval == "unrolled":
X_contrib = X[:, t, :]
elif eval == "onestep":
X_contrib = X_true[:, t, :]
# currstate = np.concatenate([X_pred_augmented[:, t, :],
# V_augmented[:, t+1, :]],
# axis=1)
currstate = np.concatenate([X_contrib, V_contrib], axis=1)
if self.log:
print("dim of currstate: ", currstate.shape)
# n_feats = currstate.shape[-1]
A_reduced = self.weights # (n_channels_y, n_feats)
pred = currstate @ A_reduced.T
if self.log:
print("dim of pred: ", pred.shape)
# if self.scaling:
# pred = np.array([(self.scaler_target.inverse_transform(pred[epoch][None, :])).flatten()
# for epoch in range(n_epochs)])
obj2 = X[:, t, :-self.n_channels_y:]
if self.log:
print("dim of obj2: ", obj2.shape)
np.concatenate(
[
pred, # n_epochs, n_channels_y
obj2
],
axis=1) # concatenate over time
if self.log:
print("concatenation works!")
print("dim of X[:, t+1, :]: ", X[:, t + 1, :].shape)
X[:, t + 1, :] = np.concatenate(
[
pred, # n_epochs, n_channels_y
obj2
],
axis=1) # concatenate over time
# Y_pred.append(pred)
# Y_pred = np.array(Y_pred)
# X_pred = self.scaler_x.inverse_transform(X_pred)
Y_pred = X[:, 1:, :self.n_channels_y]
# return np.swapaxes(Y_pred, 0, 1)
return Y_pred