in causalml/inference/tree/uplift.pyx [0:0]
def fit(self, X, treatment, y, X_val=None, treatment_val=None, y_val=None):
""" Fit the uplift model.
Args
----
X : ndarray, shape = [num_samples, num_features]
An ndarray of the covariates used to train the uplift model.
treatment : array-like, shape = [num_samples]
An array containing the treatment group for each unit.
y : array-like, shape = [num_samples]
An array containing the outcome of interest for each unit.
Returns
-------
self : object
"""
self.random_state_ = check_random_state(self.random_state)
X, y = check_X_y(X, y)
y = (y > 0).astype(Y_TYPE) # make sure it is 0 or 1, and is int8
treatment = np.asarray(treatment)
assert len(y) == len(treatment), 'Data length must be equal for X, treatment, and y.'
if X_val is not None:
X_val, y_val = check_X_y(X_val, y_val)
y_val = (y_val > 0).astype(Y_TYPE) # make sure it is 0 or 1, and is int8
treatment_val = np.asarray(treatment_val)
assert len(y_val) == len(treatment_val), 'Data length must be equal for X_val, treatment_val, and y_val.'
# Get treatment group keys. self.classes_[0] is reserved for the control group.
treatment_groups = sorted([x for x in list(set(treatment)) if x != self.control_name])
self.classes_ = [self.control_name]
treatment_idx = np.zeros_like(treatment, dtype=TR_TYPE)
treatment_val_idx = None
if treatment_val is not None:
treatment_val_idx = np.zeros_like(treatment_val, dtype=TR_TYPE)
for i, tr in enumerate(treatment_groups, 1):
self.classes_.append(tr)
treatment_idx[treatment == tr] = i
if treatment_val_idx is not None:
treatment_val_idx[treatment_val == tr] = i
self.n_class = len(self.classes_)
self.feature_imp_dict = defaultdict(float)
if (self.n_class > 2) and (self.evaluationFunction in [self.evaluate_DDP, self.evaluate_IDDP, self.evaluate_IT, self.evaluate_CIT]):
raise ValueError("The DDP, IDDP, IT, and CIT approach can only cope with two class problems, that is two different treatment "
"options (e.g., control vs treatment). Please select another approach or only use a "
"dataset which employs two treatment options.")
if self.honesty:
try:
X, X_est, treatment_idx, treatment_idx_est, y, y_est = train_test_split(X, treatment_idx, y, stratify=[treatment_idx, y], test_size=self.estimation_sample_size,
shuffle=True, random_state=self.random_state)
except ValueError:
logger.warning(f"Stratified sampling failed. Falling back to random sampling.")
X, X_est, treatment_idx, treatment_idx_est, y, y_est = train_test_split(X, treatment_idx, y, test_size=self.estimation_sample_size, shuffle=True,
random_state=self.random_state)
self.fitted_uplift_tree = self.growDecisionTreeFrom(
X, treatment_idx, y, X_val, treatment_val_idx, y_val,
max_depth=self.max_depth, early_stopping_eval_diff_scale=self.early_stopping_eval_diff_scale,
min_samples_leaf=self.min_samples_leaf,
depth=1, min_samples_treatment=self.min_samples_treatment,
n_reg=self.n_reg, parentNodeSummary_p=None
)
if self.honesty:
self.honestApproach(X_est, treatment_idx_est, y_est)
self.feature_importances_ = np.zeros(X.shape[1])
for col, imp in self.feature_imp_dict.items():
self.feature_importances_[col] = imp
self.feature_importances_ /= self.feature_importances_.sum() # normalize to add to 1