in econml/cate_interpreter/_interpreters.py [0:0]
def interpret(self, cate_estimator, X, sample_treatment_costs=None):
"""
Interpret a policy based on a linear CATE estimator when applied to a set of features
Parameters
----------
cate_estimator : :class:`.LinearCateEstimator`
The fitted estimator to interpret
X : array-like
The features against which to interpret the estimator;
must be compatible shape-wise with the features used to fit
the estimator
sample_treatment_costs : array-like, optional
The cost of treatment. Can be a scalar or have dimension (n_samples, n_treatments)
or (n_samples,) if T is a vector
Returns
-------
self: object instance
"""
if X is not None:
X = check_array(X)
X_in = X
else:
X = np.empty(shape=(1, 0))
X_in = None
self.tree_model_ = PolicyTree(criterion='neg_welfare',
splitter='best',
max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
min_samples_leaf=self.min_samples_leaf,
min_weight_fraction_leaf=self.min_weight_fraction_leaf,
max_features=self.max_features,
min_impurity_decrease=self.min_impurity_decrease,
min_balancedness_tol=self.min_balancedness_tol,
honest=False,
random_state=self.random_state)
if self.risk_level is None:
y_pred = cate_estimator.const_marginal_effect(X_in)
elif not self.risk_seeking:
y_pred, _ = cate_estimator.const_marginal_effect_interval(X_in, alpha=self.risk_level)
else:
_, y_pred = cate_estimator.const_marginal_effect_interval(X_in, alpha=self.risk_level)
# average the outcome dimension if it exists and ensure 2d y_pred
if y_pred.ndim == 3:
y_pred = np.mean(y_pred, axis=1)
elif y_pred.ndim == 2:
if (len(cate_estimator._d_y) > 0) and cate_estimator._d_y[0] > 1:
y_pred = np.mean(y_pred, axis=1, keepdims=True)
elif y_pred.ndim == 1:
y_pred = y_pred.reshape((-1, 1))
if sample_treatment_costs is not None:
if isinstance(sample_treatment_costs, numbers.Real):
y_pred -= sample_treatment_costs
else:
sample_treatment_costs = check_array(sample_treatment_costs, ensure_2d=False)
if sample_treatment_costs.ndim == 1:
sample_treatment_costs = sample_treatment_costs.reshape((-1, 1))
if sample_treatment_costs.shape == y_pred.shape:
y_pred -= sample_treatment_costs
else:
raise ValueError("`sample_treatment_costs` should be a double scalar "
"or have dimension (n_samples, n_treatments) or (n_samples,) if T is a vector")
# get index of best treatment
all_y = np.hstack([np.zeros((y_pred.shape[0], 1)), np.atleast_1d(y_pred)])
self.tree_model_.fit(X, all_y)
self.policy_value_ = np.mean(np.max(self.tree_model_.predict_value(X), axis=1))
self.always_treat_value_ = np.mean(y_pred, axis=0)
paths = self.tree_model_.decision_path(X)
node_dict = {}
for node_id in range(paths.shape[1]):
mask = paths.getcol(node_id).toarray().flatten().astype(bool)
Xsub = X_in[mask] if X_in is not None else None
if (self.include_uncertainty and
((not self.uncertainty_only_on_leaves) or (self.tree_model_.tree_.children_left[node_id] < 0))):
res = cate_estimator.const_marginal_ate_inference(Xsub)
node_dict[node_id] = {'mean': res.mean_point,
'std': res.std_point,
'ci': res.conf_int_mean(alpha=self.uncertainty_level)}
else:
cate_node = y_pred[mask]
node_dict[node_id] = {'mean': np.mean(cate_node, axis=0),
'std': np.std(cate_node, axis=0)}
self.node_dict_ = node_dict
return self