causalml/inference/tf/dragonnet.py (207 lines of code) (raw):

""" This module implements the Dragonnet [1], which adapts the design and training of neural networks to improve the quality of treatment effect estimates. The authors propose two adaptations: - A new architecture, the Dragonnet, that exploits the sufficiency of the propensity score for estimation adjustment. - A regularization procedure, targeted regularization, that induces a bias towards models that have non-parametrically optimal asymptotic properties ‘out-of-the-box’. Studies on benchmark datasets for causal inference show these adaptations outperform existing methods. Code is available at github.com/claudiashi57/dragonnet **References** [1] C. Shi, D. Blei, V. Veitch (2019). | Adapting Neural Networks for the Estimation of Treatment Effects. | https://arxiv.org/pdf/1906.02120.pdf | https://github.com/claudiashi57/dragonnet """ import numpy as np from tensorflow.keras import Input, Model from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, TerminateOnNaN from tensorflow.keras.layers import Dense, Concatenate from tensorflow.keras.optimizers import SGD, Adam from tensorflow.keras.regularizers import l2 from tensorflow.keras.models import load_model from causalml.inference.tf.utils import ( dragonnet_loss_binarycross, EpsilonLayer, regression_loss, binary_classification_loss, treatment_accuracy, track_epsilon, make_tarreg_loss, ) from causalml.inference.meta.utils import convert_pd_to_np class DragonNet: def __init__( self, neurons_per_layer=200, targeted_reg=True, ratio=1.0, val_split=0.2, batch_size=64, epochs=100, learning_rate=1e-5, momentum=0.9, reg_l2=0.01, use_adam=True, adam_epochs=30, adam_learning_rate=1e-3, loss_func=dragonnet_loss_binarycross, verbose=True, ): """ Initializes a Dragonnet. """ self.neurons_per_layer = neurons_per_layer self.targeted_reg = targeted_reg self.ratio = ratio self.val_split = val_split self.batch_size = batch_size self.epochs = epochs self.learning_rate = learning_rate self.momentum = momentum self.use_adam = use_adam self.adam_learning_rate = adam_learning_rate self.adam_epochs = adam_epochs self.reg_l2 = reg_l2 self.loss_func = loss_func self.verbose = verbose def make_dragonnet(self, input_dim): """ Neural net predictive model. The dragon has three heads. Args: input_dim (int): number of rows in input Returns: model (keras.models.Model): DragonNet model """ inputs = Input(shape=(input_dim,), name="input") # representation x = Dense( units=self.neurons_per_layer, activation="elu", kernel_initializer="RandomNormal", )(inputs) x = Dense( units=self.neurons_per_layer, activation="elu", kernel_initializer="RandomNormal", )(x) x = Dense( units=self.neurons_per_layer, activation="elu", kernel_initializer="RandomNormal", )(x) t_predictions = Dense(units=1, activation="sigmoid")(x) # HYPOTHESIS y0_hidden = Dense( units=int(self.neurons_per_layer / 2), activation="elu", kernel_regularizer=l2(self.reg_l2), )(x) y1_hidden = Dense( units=int(self.neurons_per_layer / 2), activation="elu", kernel_regularizer=l2(self.reg_l2), )(x) # second layer y0_hidden = Dense( units=int(self.neurons_per_layer / 2), activation="elu", kernel_regularizer=l2(self.reg_l2), )(y0_hidden) y1_hidden = Dense( units=int(self.neurons_per_layer / 2), activation="elu", kernel_regularizer=l2(self.reg_l2), )(y1_hidden) # third y0_predictions = Dense( units=1, activation=None, kernel_regularizer=l2(self.reg_l2), name="y0_predictions", )(y0_hidden) y1_predictions = Dense( units=1, activation=None, kernel_regularizer=l2(self.reg_l2), name="y1_predictions", )(y1_hidden) dl = EpsilonLayer() epsilons = dl(t_predictions, name="epsilon") concat_pred = Concatenate(1)( [y0_predictions, y1_predictions, t_predictions, epsilons] ) model = Model(inputs=inputs, outputs=concat_pred) return model def fit(self, X, treatment, y, p=None): """ Fits the DragonNet model. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector """ X, treatment, y = convert_pd_to_np(X, treatment, y) y = np.hstack((y.reshape(-1, 1), treatment.reshape(-1, 1))) self.dragonnet = self.make_dragonnet(X.shape[1]) metrics = [ regression_loss, binary_classification_loss, treatment_accuracy, track_epsilon, ] if self.targeted_reg: loss = make_tarreg_loss(ratio=self.ratio, dragonnet_loss=self.loss_func) else: loss = self.loss_func if self.use_adam: self.dragonnet.compile( optimizer=Adam(learning_rate=self.adam_learning_rate), loss=loss, metrics=metrics, ) adam_callbacks = [ TerminateOnNaN(), EarlyStopping(monitor="val_loss", patience=2, min_delta=0.0), ReduceLROnPlateau( monitor="loss", factor=0.5, patience=5, verbose=self.verbose, mode="auto", min_delta=1e-8, cooldown=0, min_lr=0, ), ] self.dragonnet.fit( X, y, callbacks=adam_callbacks, validation_split=self.val_split, epochs=self.adam_epochs, batch_size=self.batch_size, verbose=self.verbose, ) sgd_callbacks = [ TerminateOnNaN(), EarlyStopping(monitor="val_loss", patience=40, min_delta=0.0), ReduceLROnPlateau( monitor="loss", factor=0.5, patience=5, verbose=self.verbose, mode="auto", min_delta=0.0, cooldown=0, min_lr=0, ), ] self.dragonnet.compile( optimizer=SGD( learning_rate=self.learning_rate, momentum=self.momentum, nesterov=True ), loss=loss, metrics=metrics, ) self.dragonnet.fit( X, y, callbacks=sgd_callbacks, validation_split=self.val_split, epochs=self.epochs, batch_size=self.batch_size, verbose=self.verbose, ) def predict(self, X, treatment=None, y=None, p=None): """ Calls predict on fitted DragonNet. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix Returns: (np.array): a 2D array with shape (X.shape[0], 4), where each row takes the form of (outcome do(t=0), outcome do(t=1), propensity, epsilon) """ return self.dragonnet.predict(X) def predict_propensity(self, X): """ Predicts the individual propensity scores. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix Returns: (np.array): propensity score vector """ preds = self.predict(X) return preds[:, 2] def predict_tau(self, X): """ Predicts the individual treatment effect (tau / "ITE"). Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix Returns: (np.array): treatment effect vector """ preds = self.predict(X) return (preds[:, 1] - preds[:, 0]).reshape(-1, 1) def fit_predict(self, X, treatment, y, p=None, return_components=False): """ Fits the DragonNet model and then predicts. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector return_components (bool, optional): whether to return Returns: (np.array): predictions based on return_components flag if return_components=False (default), each row is treatment effect if return_components=True, each row is (outcome do(t=0), outcome do(t=1), propensity, epsilon) """ self.fit(X, treatment, y) return self.predict_tau(X) def save(self, h5_filepath): """ Save the dragonnet model as a H5 file. Args: h5_filepath (H5 file path): H5 file path """ self.dragonnet.save(h5_filepath) def load(self, h5_filepath, ratio=1.0, dragonnet_loss=dragonnet_loss_binarycross): """ Load the dragonnet model from a H5 file. Args: h5_filepath (H5 file path): H5 file path ratio (float): weight assigned to the targeted regularization loss component dragonnet_loss (function): a loss function """ self.dragonnet = load_model( h5_filepath, custom_objects={ "EpsilonLayer": EpsilonLayer, "dragonnet_loss_binarycross": dragonnet_loss_binarycross, "tarreg_ATE_unbounded_domain_loss": make_tarreg_loss( ratio=ratio, dragonnet_loss=dragonnet_loss ), "regression_loss": regression_loss, "binary_classification_loss": binary_classification_loss, "treatment_accuracy": treatment_accuracy, "track_epsilon": track_epsilon, }, )