in monte_carlo_tests/monte_carlo_statsmodels.py [0:0]
def run_all_mc(first_stage, folder, n_list, n_exp, hetero_coef_list, d_list,
d_x_list, p_list, t_list, cov_type_list, alpha_list):
if not os.path.exists("results"):
os.makedirs('results')
results_filename = os.path.join("results", "{}.txt".format(folder))
np.random.seed(123)
coverage_est = {}
coverage_lr = {}
n_tests = 0
n_failed_coef = 0
n_failed_effect = 0
cov_tol = .04
for n in n_list:
for hetero_coef in hetero_coef_list:
for d in d_list:
for d_x in d_x_list:
if d_x > d:
continue
for p in p_list:
for d_t in t_list:
X_test = np.unique(np.random.binomial(1, .5, size=(20, d_x)), axis=0)
t0 = time.time()
for it in range(n_exp):
X = np.random.binomial(1, .8, size=(n, d))
T = np.hstack([np.random.binomial(1, .5 * X[:, 0] + .25,
size=(n,)).reshape(-1, 1) for _ in range(d_t)])
true_coef = np.hstack([np.hstack([it + np.arange(p).reshape(-1, 1),
it + np.ones((p, 1)), np.zeros((p, d_x - 1))])
for it in range(d_t)])
def true_effect(x, t):
return cross_product(
np.hstack([np.ones((x.shape[0], 1)), x[:, :d_x]]), t) @ true_coef.T
y = true_effect(X, T) + X[:, [0] * p] +\
(hetero_coef * X[:, [0]] + 1) * np.random.normal(0, 1, size=(n, p))
XT = np.hstack([X, T])
X1, X2, y1, y2, X_final_first, X_final_sec, y_sum_first, y_sum_sec,\
n_sum_first, n_sum_sec, var_first, var_sec = _summarize(XT, y)
X = np.vstack([X1, X2])
y = np.concatenate((y1, y2))
X_final = np.vstack([X_final_first, X_final_sec])
y_sum = np.concatenate((y_sum_first, y_sum_sec))
n_sum = np.concatenate((n_sum_first, n_sum_sec))
var_sum = np.concatenate((var_first, var_sec))
first_half_sum = len(y_sum_first)
first_half = len(y1)
for cov_type in cov_type_list:
class SplitterSum:
def __init__(self):
return
def split(self, X, T):
return [(np.arange(0, first_half_sum),
np.arange(first_half_sum, X.shape[0])),
(np.arange(first_half_sum, X.shape[0]),
np.arange(0, first_half_sum))]
est = LinearDML(model_y=first_stage(),
model_t=first_stage(),
cv=SplitterSum(),
linear_first_stages=False,
discrete_treatment=False)
est.fit(y_sum,
X_final[:, -d_t:],
X_final[:, :d_x],
X_final[:, d_x:-d_t],
freq_weight=n_sum,
sample_var=var_sum,
inference=StatsModelsInference(cov_type=cov_type))
class Splitter:
def __init__(self):
return
def split(self, X, T):
return [(np.arange(0, first_half), np.arange(first_half, X.shape[0])),
(np.arange(first_half, X.shape[0]), np.arange(0, first_half))]
lr = LinearDML(model_y=first_stage(),
model_t=first_stage(),
cv=Splitter(),
linear_first_stages=False,
discrete_treatment=False)
lr.fit(y, X[:, -d_t:], X=X[:, :d_x], W=X[:, d_x:-d_t],
inference=StatsModelsInference(cov_type=cov_type))
for alpha in alpha_list:
key = ("n_{}_n_exp_{}_hetero_{}_d_{}_d_x_"
"{}_p_{}_d_t_{}_cov_type_{}_alpha_{}").format(
n, n_exp, hetero_coef, d, d_x, p, d_t, cov_type, alpha)
_append_coverage(key, coverage_est, est, X_test,
alpha, true_coef, true_effect)
_append_coverage(key, coverage_lr, lr, X_test,
alpha, true_coef, true_effect)
if it == n_exp - 1:
n_tests += 1
mean_coef_cov = np.mean(coverage_est[key]['coef_cov'])
mean_eff_cov = np.mean(coverage_est[key]['effect_cov'])
mean_coef_cov_lr = np.mean(coverage_lr[key]['coef_cov'])
mean_eff_cov_lr = np.mean(coverage_lr[key]['effect_cov'])
[print("{}. Time: {:.2f}, Mean Coef Cov: ({:.4f}, {:.4f}), "
"Mean Effect Cov: ({:.4f}, {:.4f})".format(key,
time.time() - t0,
mean_coef_cov,
mean_coef_cov_lr,
mean_eff_cov,
mean_eff_cov_lr),
file=f)
for f in [None, open(results_filename, "a")]]
coef_cov_dev = mean_coef_cov - (1 - alpha)
if np.abs(coef_cov_dev) >= cov_tol:
n_failed_coef += 1
[print("BAD coef coverage on "
"average: deviation = {:.4f}".format(coef_cov_dev), file=f)
for f in [None, open(results_filename, "a")]]
eff_cov_dev = mean_eff_cov - (1 - alpha)
if np.abs(eff_cov_dev) >= cov_tol:
n_failed_effect += 1
[print("BAD effect coverage on "
"average: deviation = {:.4f}".format(eff_cov_dev), file=f)
for f in [None, open(results_filename, "a")]]
[print("Finished {} Monte Carlo Tests. Failed Coef Coverage Tests: {}/{}."
"Failed Effect Coverage Tests: {}/{}. (Coverage Tolerance={})".format(n_tests,
n_failed_coef,
n_tests,
n_failed_effect,
n_tests,
cov_tol),
file=f) for f in [None, open(results_filename, "a")]]
agg_coverage_est, std_coverage_est, q_coverage_est = _agg_coverage(coverage_est)
agg_coverage_lr, std_coverage_lr, q_coverage_lr = _agg_coverage(coverage_lr)
[print("\nResults for: {}\n--------------------------\n".format(folder), file=f)
for f in [None, open(results_filename, "a")]]
plot_coverage(agg_coverage_est, 'coef_cov', n, n_exp, hetero_coef_list, d_list, d_x_list,
p_list, t_list, cov_type_list, alpha_list, prefix="sum_", folder=folder)
plot_coverage(agg_coverage_lr, 'coef_cov', n, n_exp, hetero_coef_list, d_list, d_x_list,
p_list, t_list, cov_type_list, alpha_list, prefix="orig_", folder=folder)
plot_coverage(agg_coverage_est, 'effect_cov', n, n_exp, hetero_coef_list, d_list, d_x_list,
p_list, t_list, cov_type_list, alpha_list, prefix="sum_", folder=folder)
plot_coverage(agg_coverage_lr, 'effect_cov', n, n_exp, hetero_coef_list, d_list, d_x_list,
p_list, t_list, cov_type_list, alpha_list, prefix="orig_", folder=folder)
[print("Summarized Data\n----------------", file=f) for f in [None, open(results_filename, "a")]]
print_aggregate(agg_coverage_est, std_coverage_est, q_coverage_est)
print_aggregate(agg_coverage_est, std_coverage_est, q_coverage_est, lambda: open(results_filename, "a"))
[print("\nUn-Summarized Data\n-----------------", file=f) for f in [None, open(results_filename, "a")]]
print_aggregate(agg_coverage_lr, std_coverage_lr, q_coverage_lr)
print_aggregate(agg_coverage_lr, std_coverage_lr, q_coverage_lr, lambda: open(results_filename, "a"))