aiops/ContraLSP/hmm/main.py (405 lines of code) (raw):
import multiprocessing as mp
import os
from pytorch_lightning.callbacks import EarlyStopping
from argparse import ArgumentParser
from captum.attr import DeepLift, GradientShap, IntegratedGradients, Lime
from pytorch_lightning import Trainer, seed_everything
from pytorch_lightning.loggers import TensorBoardLogger
from typing import List
from utils.tools import print_results
from tint.attr import (
DynaMask,
ExtremalMask,
Fit,
Retain,
TemporalAugmentedOcclusion,
TemporalOcclusion,
TimeForwardTunnel,
)
from tint.attr.models import (
ExtremalMaskNet,
JointFeatureGeneratorNet,
MaskNet,
RetainNet,
)
from tint.datasets import HMM
from tint.metrics.white_box import (
aup,
aur,
information,
entropy,
roc_auc,
auprc,
)
from tint.models import MLP, RNN
from attribution.gatemasknn import *
from attribution.gate_mask import GateMask
from hmm.classifier import StateClassifierNet
def main(
explainers: List[str],
device: str = "cpu",
fold: int = 0,
seed: int = 42,
deterministic: bool = False,
is_train: bool = True,
lambda_1: float = 1.0,
lambda_2: float = 1.0,
output_file: str = "results.csv",
):
# If deterministic, seed everything
if deterministic:
seed_everything(seed=seed, workers=True)
# Get accelerator and device
accelerator = device.split(":")[0]
print(accelerator)
device_id = 1
if len(device.split(":")) > 1:
device_id = [int(device.split(":")[1])]
# Create lock
lock = mp.Lock()
# Load data
hmm = HMM(n_folds=5, fold=fold, seed=seed)
# Create classifier
classifier = StateClassifierNet(
feature_size=3,
n_state=2,
hidden_size=200,
regres=True,
loss="cross_entropy",
lr=0.0001,
l2=1e-3,
)
# Train classifier
trainer = Trainer(
max_epochs=50,
accelerator=accelerator,
devices=device_id,
deterministic=deterministic,
logger=TensorBoardLogger(
save_dir=".",
version=random.getrandbits(128),
),
)
if is_train:
trainer.fit(classifier, datamodule=hmm)
if not os.path.exists("./model/"):
os.makedirs("./model/")
th.save(classifier.state_dict(), "./model/classifier_{}_{}".format(fold, seed))
else:
classifier.load_state_dict(th.load("./model/classifier_{}_{}".format(fold, seed)))
# Get data for explainers
with lock:
x_train = hmm.preprocess(split="train")["x"].to(device)
x_test = hmm.preprocess(split="test")["x"].to(device)
y_test = hmm.preprocess(split="test")["y"].to(device)
true_saliency = hmm.true_saliency(split="test").to(device)
print("==============The sum of true_saliency is", true_saliency.sum(), "==============\n" + 70 * "=")
# if not os.path.exists("./results/"):
# os.makedirs("./results/")
# np.save('./results/true.npy', true_saliency.detach().numpy())
# Switch to eval
classifier.eval()
classifier.zero_grad()
# Set model to device
classifier.to(device)
# Disable cudnn if using cuda accelerator.
# Please see https://captum.ai/docs/faq#how-can-i-resolve-cudnn-rnn-backward-error-for-rnn-or-lstm-network
# for more information.
if accelerator == "cuda":
th.backends.cudnn.enabled = False
# Create dict of attributions
attr = dict()
if "deep_lift" in explainers:
explainer = TimeForwardTunnel(DeepLift(classifier))
attr["deep_lift"] = explainer.attribute(
x_test,
baselines=x_test * 0,
task="binary",
show_progress=True,
).abs()
if "dyna_mask" in explainers:
trainer = Trainer(
max_epochs=500,
accelerator=accelerator,
devices=device_id,
log_every_n_steps=2,
deterministic=deterministic,
logger=TensorBoardLogger(
save_dir=".",
version=random.getrandbits(128),
),
)
mask = MaskNet(
forward_func=classifier,
perturbation="gaussian_blur",
sigma_max=1,
keep_ratio=list(np.arange(0.25, 0.35, 0.05)),
size_reg_factor_init=0.1,
size_reg_factor_dilation=100,
time_reg_factor=1.0,
)
explainer = DynaMask(classifier)
_attr = explainer.attribute(
x_test,
additional_forward_args=(True,),
trainer=trainer,
mask_net=mask,
batch_size=100,
return_best_ratio=True,
)
print(f"Best keep ratio is {_attr[1]}")
attr["dyna_mask"] = _attr[0].to(device)
if "extremal_mask" in explainers:
trainer = Trainer(
max_epochs=500,
accelerator=accelerator,
devices=device_id,
log_every_n_steps=2,
deterministic=deterministic,
logger=TensorBoardLogger(
save_dir=".",
version=random.getrandbits(128),
),
)
mask = ExtremalMaskNet(
forward_func=classifier,
model=nn.Sequential(
RNN(
input_size=x_test.shape[-1],
rnn="gru",
hidden_size=x_test.shape[-1],
bidirectional=True,
),
MLP([2 * x_test.shape[-1], x_test.shape[-1]]),
),
lambda_1=1.0,
lambda_2=1.0,
optim="adam",
lr=0.01,
)
explainer = ExtremalMask(classifier)
_attr = explainer.attribute(
x_test,
additional_forward_args=(True,),
trainer=trainer,
mask_net=mask,
batch_size=100,
)
attr["extremal_mask"] = _attr.to(device)
if "gate_mask" in explainers:
trainer = Trainer(
max_epochs=500,
accelerator=accelerator,
devices=device_id,
log_every_n_steps=2,
deterministic=deterministic,
logger=TensorBoardLogger(
save_dir=".",
version=random.getrandbits(128),
),
)
mask = GateMaskNet(
forward_func=classifier,
model=nn.Sequential(
RNN(
input_size=x_test.shape[-1],
rnn="gru",
hidden_size=x_test.shape[-1],
bidirectional=True,
),
MLP([2 * x_test.shape[-1], x_test.shape[-1]]),
),
lambda_1=lambda_1,
lambda_2=lambda_2,
optim="adam",
lr=0.01,
)
explainer = GateMask(classifier)
_attr = explainer.attribute(
x_test,
additional_forward_args=(True,),
trainer=trainer,
mask_net=mask,
batch_size=x_test.shape[0],
sigma=0.5,
)
attr["gate_mask"] = _attr.to(device)
print_results(attr["gate_mask"], true_saliency)
if "fit" in explainers:
generator = JointFeatureGeneratorNet(rnn_hidden_size=6)
trainer = Trainer(
max_epochs=200,
accelerator=accelerator,
devices=device_id,
log_every_n_steps=10,
deterministic=deterministic,
logger=TensorBoardLogger(
save_dir=".",
version=random.getrandbits(128),
),
)
explainer = Fit(
classifier,
generator=generator,
features=x_test,
trainer=trainer,
)
attr["fit"] = explainer.attribute(x_test, show_progress=True)
if "gradient_shap" in explainers:
explainer = TimeForwardTunnel(GradientShap(classifier.cpu()))
attr["gradient_shap"] = explainer.attribute(
x_test.cpu(),
baselines=th.cat([x_test.cpu() * 0, x_test.cpu()]),
n_samples=50,
stdevs=0.0001,
task="binary",
show_progress=True,
).abs()
classifier.to(device)
if "integrated_gradients" in explainers:
explainer = TimeForwardTunnel(IntegratedGradients(classifier))
attr["integrated_gradients"] = explainer.attribute(
x_test,
baselines=x_test * 0,
internal_batch_size=200,
task="binary",
show_progress=True,
).abs()
if "lime" in explainers:
explainer = TimeForwardTunnel(Lime(classifier))
attr["lime"] = explainer.attribute(
x_test,
task="binary",
show_progress=True,
).abs()
if "augmented_occlusion" in explainers:
explainer = TimeForwardTunnel(
TemporalAugmentedOcclusion(
classifier, data=x_train, n_sampling=10, is_temporal=True
)
)
attr["augmented_occlusion"] = explainer.attribute(
x_test,
sliding_window_shapes=(1,),
attributions_fn=abs,
task="binary",
show_progress=True,
).abs()
if "occlusion" in explainers:
explainer = TimeForwardTunnel(TemporalOcclusion(classifier))
attr["occlusion"] = explainer.attribute(
x_test,
sliding_window_shapes=(1,),
baselines=x_train.mean(0, keepdim=True),
attributions_fn=abs,
task="binary",
show_progress=True,
).abs()
if "retain" in explainers:
retain = RetainNet(
dim_emb=128,
dropout_emb=0.4,
dim_alpha=8,
dim_beta=8,
dropout_context=0.4,
dim_output=2,
loss="cross_entropy",
)
explainer = Retain(
datamodule=hmm,
retain=retain,
trainer=Trainer(
max_epochs=50,
accelerator=accelerator,
devices=device_id,
deterministic=deterministic,
logger=TensorBoardLogger(
save_dir=".",
version=random.getrandbits(128),
),
),
)
attr["retain"] = (
explainer.attribute(x_test, target=y_test).abs().to(device)
)
with open(output_file, "a") as fp, lock:
for k, v in attr.items():
fp.write(str(seed) + ",")
fp.write(str(fold) + ",")
fp.write(k + ",")
fp.write(str(lambda_1) + ",")
fp.write(str(lambda_2) + ",")
fp.write(f"{aup(v, true_saliency):.4},")
fp.write(f"{aur(v, true_saliency):.4},")
fp.write(f"{information(v, true_saliency):.4},")
fp.write(f"{entropy(v, true_saliency):.4},")
fp.write(f"{roc_auc(v, true_saliency):.4},")
fp.write(f"{auprc(v, true_saliency):.4}")
fp.write("\n")
# if not os.path.exists("./results/"):
# os.makedirs("./results/")
# for key in attr.keys():
# result = attr[key]
# np.save('./results/{}_result_{}_{}.npy'.format(key, fold, seed), result.detach().numpy())
def parse_args():
parser = ArgumentParser()
parser.add_argument(
"--explainers",
type=str,
default=[
"gate_mask",
"deep_lift",
"dyna_mask",
"extremal_mask",
"fit",
"gradient_shap",
"integrated_gradients",
"lime",
"augmented_occlusion",
"occlusion",
"retain",
],
nargs="+",
metavar="N",
help="List of explainer to use.",
)
parser.add_argument(
"--device",
type=str,
default="cpu",
help="Which device to use.",
)
parser.add_argument(
"--fold",
type=int,
default=0,
help="Fold of the cross-validation.",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed for data generation.",
)
parser.add_argument(
"--train",
type=bool,
default=False,
help="Train thr rnn classifier.",
)
parser.add_argument(
"--deterministic",
action="store_true",
help="Whether to make training deterministic or not.",
)
parser.add_argument(
"--lambda-1",
type=float,
default=2, #2 is best
help="Lambda 1 hyperparameter.",
)
parser.add_argument(
"--lambda-2",
type=float,
default=1,
help="Lambda 2 hyperparameter.",
)
parser.add_argument(
"--output-file",
type=str,
default="results.csv",
help="Where to save the results.",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
main(
explainers=args.explainers,
device=args.device,
fold=args.fold,
seed=args.seed,
deterministic=args.deterministic,
is_train=args.train,
lambda_1=args.lambda_1,
lambda_2=args.lambda_2,
output_file=args.output_file,
)