in code/experiment_synthetic/models.py [0:0]
def __init__(self, environments, args):
self.coefficients = None
self.alpha = args["alpha"]
x_all = []
y_all = []
e_all = []
for e, (x, y) in enumerate(environments):
x_all.append(x.numpy())
y_all.append(y.numpy())
e_all.append(np.full(x.shape[0], e))
x_all = np.vstack(x_all)
y_all = np.vstack(y_all)
e_all = np.hstack(e_all)
dim = x_all.shape[1]
accepted_subsets = []
for subset in self.powerset(range(dim)):
if len(subset) == 0:
continue
x_s = x_all[:, subset]
reg = LinearRegression(fit_intercept=False).fit(x_s, y_all)
p_values = []
for e in range(len(environments)):
e_in = np.where(e_all == e)[0]
e_out = np.where(e_all != e)[0]
res_in = (y_all[e_in] - reg.predict(x_s[e_in, :])).ravel()
res_out = (y_all[e_out] - reg.predict(x_s[e_out, :])).ravel()
p_values.append(self.mean_var_test(res_in, res_out))
# TODO: Jonas uses "min(p_values) * len(environments) - 1"
p_value = min(p_values) * len(environments)
if p_value > self.alpha:
accepted_subsets.append(set(subset))
if args["verbose"]:
print("Accepted subset:", subset)
if len(accepted_subsets):
accepted_features = list(set.intersection(*accepted_subsets))
if args["verbose"]:
print("Intersection:", accepted_features)
self.coefficients = np.zeros(dim)
if len(accepted_features):
x_s = x_all[:, list(accepted_features)]
reg = LinearRegression(fit_intercept=False).fit(x_s, y_all)
self.coefficients[list(accepted_features)] = reg.coef_
self.coefficients = torch.Tensor(self.coefficients)
else:
self.coefficients = torch.zeros(dim)