in econml/solutions/causal_analysis/_causal_analysis.py [0:0]
def individualized_policy(self, Xtest, feature_index, *, n_rows=None, treatment_costs=0, alpha=0.05):
"""
Get individualized treatment policy based on the learned model for a feature, sorted by the predicted effect.
Parameters
----------
Xtest: array-like
Features
feature_index: int or string
Index of the feature to be considered as treatment
n_rows: int, optional
How many rows to return (all rows by default)
treatment_costs: array-like, default 0
Cost of treatment, as a scalar value or per-sample. For continuous features this is the marginal cost per
unit of treatment; for discrete features, this is the difference in cost between each of the non-default
values and the default value (i.e., if non-scalar the array should have shape (n,d_t-1))
alpha: float in [0, 1], default 0.05
Confidence level of the confidence intervals
A (1-alpha)*100% confidence interval is returned
Returns
-------
output: DataFrame
Dataframe containing recommended treatment, effect, confidence interval, sorted by effect
"""
result = self._safe_result_index(Xtest, feature_index)
# get dataframe with all but selected column
orig_df = pd.DataFrame(Xtest, columns=self.feature_names_).rename(
columns={self.feature_names_[result.feature_index]: 'Current treatment'})
Xtest = result.X_transformer.transform(Xtest)
if Xtest.shape[1] == 0:
x_rows = Xtest.shape[0]
Xtest = None
if result.feature_baseline is None:
# apply 10% of a typical treatment for this feature
effect = result.estimator.effect_inference(Xtest, T1=result.treatment_value * 0.1)
else:
effect = result.estimator.const_marginal_effect_inference(Xtest)
if Xtest is None: # we got a scalar effect although our original X may have had more rows
effect = effect._expand_outputs(x_rows)
multi_y = (not self._vec_y) or self.classification
if multi_y and result.feature_baseline is not None and np.ndim(treatment_costs) == 2:
# we've got treatment costs of shape (n, d_t-1) so we need to add a y dimension to broadcast safely
treatment_costs = np.expand_dims(treatment_costs, 1)
effect.translate(-treatment_costs)
est = effect.point_estimate
est_lb = effect.conf_int(alpha)[0]
est_ub = effect.conf_int(alpha)[1]
if multi_y: # y was an array, not a vector
est = np.squeeze(est, 1)
est_lb = np.squeeze(est_lb, 1)
est_ub = np.squeeze(est_ub, 1)
if result.feature_baseline is None:
rec = np.empty(est.shape[0], dtype=object)
rec[est > 0] = "increase"
rec[est <= 0] = "decrease"
# set the effect bounds; for positive treatments these agree with
# the estimates; for negative treatments, we need to invert the interval
eff_lb, eff_ub = est_lb, est_ub
eff_lb[est <= 0], eff_ub[est <= 0] = -eff_ub[est <= 0], -eff_lb[est <= 0]
# the effect is now always positive since we decrease treatment when negative
eff = np.abs(est)
else:
# for discrete treatment, stack a zero result in front for control
zeros = np.zeros((est.shape[0], 1))
all_effs = np.hstack([zeros, est])
eff_ind = np.argmax(all_effs, axis=1)
treatment_arr = np.array([result.feature_baseline] + [lvl for lvl in result.feature_levels], dtype=object)
rec = treatment_arr[eff_ind]
# we need to call effect_inference to get the correct CI between the two treatment options
effect = result.estimator.effect_inference(Xtest, T0=orig_df['Current treatment'], T1=rec)
# we now need to construct the delta in the cost between the two treatments and translate the effect
current_treatment = orig_df['Current treatment'].values
if np.ndim(treatment_costs) >= 2:
# remove third dimenions potentially added
if multi_y: # y was an array, not a vector
treatment_costs = np.squeeze(treatment_costs, 1)
assert treatment_costs.shape[1] == len(treatment_arr) - 1, ("If treatment costs are an array, "
" they must be of shape (n, d_t-1),"
" where n is the number of samples"
" and d_t the number of treatment"
" categories.")
all_costs = np.hstack([zeros, treatment_costs])
# find cost of current treatment: equality creates a 2d array with True on each row,
# only if its the location of the current treatment. Then we take the corresponding cost.
current_cost = all_costs[current_treatment.reshape(-1, 1) == treatment_arr.reshape(1, -1)]
target_cost = np.take_along_axis(all_costs, eff_ind.reshape(-1, 1), 1).reshape(-1)
else:
assert isinstance(treatment_costs, (int, float)), ("Treatments costs should either be float or "
"a 2d array of size (n, d_t-1).")
all_costs = np.array([0] + [treatment_costs] * (len(treatment_arr) - 1))
# construct index of current treatment
current_ind = (current_treatment.reshape(-1, 1) ==
treatment_arr.reshape(1, -1)) @ np.arange(len(treatment_arr))
current_cost = all_costs[current_ind]
target_cost = all_costs[eff_ind]
delta_cost = current_cost - target_cost
# add second dimension if needed for broadcasting during translation of effect
if multi_y:
delta_cost = np.expand_dims(delta_cost, 1)
effect.translate(delta_cost)
eff = effect.point_estimate
eff_lb, eff_ub = effect.conf_int(alpha)
if multi_y: # y was an array, not a vector
eff = np.squeeze(eff, 1)
eff_lb = np.squeeze(eff_lb, 1)
eff_ub = np.squeeze(eff_ub, 1)
df = pd.DataFrame({'Treatment': rec,
'Effect of treatment': eff,
'Effect of treatment lower bound': eff_lb,
'Effect of treatment upper bound': eff_ub},
index=orig_df.index)
return df.join(orig_df).sort_values('Effect of treatment',
ascending=False).head(n_rows)