aiops/ContraLSP/attribution/perturbation.py (176 lines of code) (raw):

from abc import ABC, abstractmethod import torch class Perturbation(ABC): @abstractmethod def __init__(self, device, eps=1.0e-7): self.mask_tensor = None self.eps = eps self.device = device @abstractmethod def apply(self, X, mask_tensor): """This method applies the perturbation on the input based on the mask tensor. Args: X: Input tensor. mask_tensor: Tensor containing the mask coefficients. """ if X is None or mask_tensor is None: raise NameError("The mask_tensor should be fitted before or while calling the perturb method.") @abstractmethod def apply_multiple(self, X, mask_tensor): pass @abstractmethod def apply_extremal(self, X, extremal_tensor: torch.Tensor): """This method applies the perturbation on the input based on the extremal tensor. The extremal tensor is just a set of mask, the perturbation is applied according to each mask. Args: X: Input tensor. extremal_tensor: (N_area, T, N_feature) tensor containing the different masks. """ if X is None or extremal_tensor is None: raise NameError("The mask_tensor should be fitted before or while calling the perturb method.") @abstractmethod def apply_extremal_multiple(self, X, extremal_tensor: torch.Tensor): if X is None or extremal_tensor is None: raise NameError("The mask_tensor should be fitted before or while calling the perturb method.") class FadeMovingAverage(Perturbation): """This class allows to create and apply 'fade to moving average' perturbations on inputs based on masks. Attributes: mask_tensor (torch.tensor): The mask tensor than indicates the intensity of the perturbation to be applied at each input entry. eps (float): Small number used for numerical stability. device: Device on which the tensor operations are executed. """ def __init__(self, device, eps=1.0e-7): super().__init__(eps=eps, device=device) def apply(self, X, mask_tensor): super().apply(X=X, mask_tensor=mask_tensor) T = X.shape[0] # Compute the moving average for each feature and concatenate it to create a tensor with X's shape moving_average = torch.mean(X, 0).reshape(1, -1).to(self.device) moving_average_tiled = moving_average.repeat(T, 1) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = mask_tensor * X + (1 - mask_tensor) * moving_average_tiled return X_pert def apply_multiple(self, X, mask_tensor): # X = (num_sample, T, nfeat) # mask_tensor = (num_sample, T, nfeat) super().apply(X=X, mask_tensor=mask_tensor) num_sample, T, num_features = X.shape # Compute the moving average for each feature and concatenate it to create a tensor with X's shape moving_average = torch.mean(X, 1).reshape(num_sample, 1, num_features).to(self.device) moving_average_tiled = moving_average.repeat(1, T, 1) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = mask_tensor * X + (1 - mask_tensor) * moving_average_tiled return X_pert # (num_sample, T, nfeat) def apply_extremal(self, X, extremal_tensor: torch.Tensor): super().apply_extremal(X, extremal_tensor) # Compute the moving average for each feature and concatenate it to create a tensor with X's shape moving_average = torch.mean(X, dim=0).reshape(1, 1, -1).to(self.device) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = extremal_tensor * X + (1 - extremal_tensor) * moving_average return X_pert def apply_extremal_multiple(self, X, extremal_tensor: torch.Tensor): super().apply_extremal_multiple(X, extremal_tensor) N_area, num_samples, T, N_features = extremal_tensor.shape # X = (nm_sample, T, num_features) # Compute the moving average for each feature and concatenate it to create a tensor with X's shape moving_average = torch.mean(X, dim=1).reshape(1, num_samples, 1, N_features).to(self.device) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = extremal_tensor * X.unsqueeze(0) + (1 - extremal_tensor) * moving_average # X_pert (N_area, num_samples, T, N_features) return X_pert class GaussianBlur(Perturbation): """This class allows to create and apply 'Gaussian blur' perturbations on inputs based on masks. Attributes: mask_tensor (torch.tensor): The mask tensor than indicates the intensity of the perturbation to be applied at each input entry. eps (float): Small number used for numerical stability. device: Device on which the tensor operations are executed. sigma_max (float): Maximal width for the Gaussian blur. """ def __init__(self, device, eps=1.0e-7, sigma_max=2): super().__init__(eps=eps, device=device) self.sigma_max = sigma_max def apply(self, X, mask_tensor): super().apply(X=X, mask_tensor=mask_tensor) T = X.shape[0] T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) # Convert the mask into a tensor containing the width of each Gaussian perturbation sigma_tensor = self.sigma_max * ((1 + self.eps) - mask_tensor) sigma_tensor = sigma_tensor.unsqueeze(0) # For each feature and each time, we compute the coefficients for the Gaussian perturbation T1_tensor = T_axis.unsqueeze(1).unsqueeze(2) T2_tensor = T_axis.unsqueeze(0).unsqueeze(2) filter_coefs = torch.exp(torch.divide(-1.0 * (T1_tensor - T2_tensor) ** 2, 2.0 * (sigma_tensor ** 2))) filter_coefs = torch.divide(filter_coefs, torch.sum(filter_coefs, 0)) # The perturbation is obtained by replacing each input by the linear combination weighted by Gaussian coefs X_pert = torch.einsum("sti,si->ti", filter_coefs, X) return X_pert def apply_multiple(self, X, mask_tensor): # X = (num_sample, T, nfeat) # mask_tensor = (num_sample, T, nfeat) super().apply(X=X, mask_tensor=mask_tensor) T = X.shape[1] T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) # Convert the mask into a tensor containing the width of each Gaussian perturbation sigma_tensor = self.sigma_max * ((1 + self.eps) - mask_tensor) sigma_tensor = sigma_tensor.unsqueeze(1) # (num_sample, 1, T, nfeat) # For each feature and each time, we compute the coefficients for the Gaussian perturbation T1_tensor = T_axis.reshape(1, T, 1, 1) # (1, T, 1, 1) T2_tensor = T_axis.reshape(1, 1, T, 1) # (1, 1, T, 1) filter_coefs = torch.exp(torch.divide(-1.0 * (T1_tensor - T2_tensor) ** 2, 2.0 * (sigma_tensor ** 2))) filter_coefs = torch.divide(filter_coefs, torch.sum(filter_coefs, 1, keepdim=True)) # (num_sample, T, T, num_feat) # The perturbation is obtained by replacing each input by the linear combination weighted by Gaussian coefs X_pert = torch.einsum("bsti,bsi->bti", filter_coefs, X) return X_pert def apply_extremal(self, X: torch.Tensor, extremal_tensor: torch.Tensor): N_area, T, N_features = extremal_tensor.shape T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) self.extremal_tensor = extremal_tensor[:, :, :] # Convert the mask into a tensor containing the width of each Gaussian perturbation sigma_tensor = self.sigma_max * ((1 + self.eps) - extremal_tensor).reshape(N_area, 1, T, N_features) # For each feature and each time, we compute the coefficients for the Gaussian perturbation T1_tensor = T_axis.reshape(1, 1, T, 1) T2_tensor = T_axis.reshape(1, T, 1, 1) numerator = -1.0 * (T1_tensor - T2_tensor) ** 2 denominator = 2.0 * (sigma_tensor ** 2) self.numerator, self.denominator = numerator, denominator filter_coefs = torch.exp(torch.divide(numerator, denominator)) # (N_area, T, T, N_features) self.filter_coefs_before = filter_coefs filter_coefs = filter_coefs / torch.sum(filter_coefs, dim=1, keepdim=True) self.filter_coefs_after = filter_coefs # The perturbation is obtained by replacing each input by the linear combination weighted by Gaussian coefs X_pert = torch.einsum("asti,si->ati", filter_coefs, X) return X_pert def apply_extremal_multiple(self, X: torch.Tensor, extremal_tensor: torch.Tensor): N_area, num_samples, T, N_features = extremal_tensor.shape T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) #(T) # Convert the mask into a tensor containing the width of each Gaussian perturbation self.extremal_tensor = extremal_tensor[:, 0, :, :] sigma_tensor = self.sigma_max * ((1 + self.eps) - extremal_tensor).reshape(N_area, num_samples, 1, T, N_features) # For each feature and each time, we compute the coefficients for the Gaussian perturbation T1_tensor = T_axis.reshape(1, 1, 1, T, 1) T2_tensor = T_axis.reshape(1, 1, T, 1, 1) numerator = -1.0 * (T1_tensor - T2_tensor) ** 2 denominator = 2.0 * (sigma_tensor ** 2) self.numerator, self.denominator = numerator, denominator filter_coefs = torch.exp(torch.divide(numerator, denominator)) # (N_area, num_samples, T, T, N_features) self.filter_coefs_before = filter_coefs filter_coefs = filter_coefs / torch.sum(filter_coefs, dim=2, keepdim=True) self.filter_coefs_after = filter_coefs # The perturbation is obtained by replacing each input by the linear combination weighted by Gaussian coefs X_pert = torch.einsum("absti,bsi->abti", filter_coefs, X) # (N_area, num_samples, T, N_features) return X_pert class FadeMovingAverageWindow(Perturbation): """This class allows to create and apply 'fade to moving average' perturbations on inputs based on masks. Attributes: mask_tensor (torch.tensor): The mask tensor than indicates the intensity of the perturbation to be applied at each input entry. eps (float): Small number used for numerical stability. device: Device on which the tensor operations are executed. window_size: Size of the window where each moving average is computed (called W in the paper). """ def __init__(self, device, window_size=2, eps=1.0e-7): super().__init__(eps=eps, device=device) self.window_size = window_size def apply(self, X, mask_tensor): super().apply(X=X, mask_tensor=mask_tensor) T = X.shape[0] T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) # For each feature and each time, we compute the coefficients of the perturbation tensor T1_tensor = T_axis.unsqueeze(1) T2_tensor = T_axis.unsqueeze(0) filter_coefs = torch.abs(T1_tensor - T2_tensor) <= self.window_size filter_coefs = filter_coefs / (2 * self.window_size + 1) X_avg = torch.einsum("st,si->ti", filter_coefs, X) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = X_avg + mask_tensor * (X - X_avg) return X_pert def apply_extremal(self, X: torch.Tensor, masks_tensor: torch.Tensor): N_area, T, N_features = masks_tensor.shape T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) # For each feature and each time, we compute the coefficients for the Gaussian perturbation T1_tensor = T_axis.unsqueeze(1) T2_tensor = T_axis.unsqueeze(0) filter_coefs = torch.abs(T1_tensor - T2_tensor) <= self.window_size filter_coefs = filter_coefs / (2 * self.window_size + 1) X_avg = torch.einsum("st,si->ti", filter_coefs, X[0, :, :]) X_avg = X_avg.unsqueeze(0) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = X_avg + masks_tensor * (X - X_avg) return X_pert class FadeMovingAveragePastWindow(Perturbation): """This class allows to create and apply 'fade to past moving average' perturbations on inputs based on masks. Attributes: mask_tensor (torch.tensor): The mask tensor than indicates the intensity of the perturbation to be applied at each input entry. eps (float): Small number used for numerical stability. device: Device on which the tensor operations are executed. window_size: Size of the window where each moving average is computed (called W in the paper). """ def __init__(self, device, window_size=2, eps=1.0e-7): super().__init__(eps=eps, device=device) self.window_size = window_size def apply(self, X, mask_tensor): super().apply(X=X, mask_tensor=mask_tensor) T = X.shape[0] T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) # For each feature and each time, we compute the coefficients of the perturbation tensor T1_tensor = T_axis.unsqueeze(1) T2_tensor = T_axis.unsqueeze(0) filter_coefs = (T1_tensor - T2_tensor) <= self.window_size filter_coefs = filter_coefs / (2 * self.window_size + 1) X_avg = torch.einsum("st,si->ti", filter_coefs, X) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = X_avg + mask_tensor * (X - X_avg) return X_pert def apply_extremal(self, X: torch.Tensor, masks_tensor: torch.Tensor): N_area, T, N_features = masks_tensor.shape T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) T_axis = torch.arange(1, T + 1, dtype=int, device=self.device) # For each feature and each time, we compute the coefficients of the perturbation tensor T1_tensor = T_axis.unsqueeze(1) T2_tensor = T_axis.unsqueeze(0) filter_coefs = (T1_tensor - T2_tensor) <= self.window_size filter_coefs = filter_coefs / (2 * self.window_size + 1) X_avg = torch.einsum("st,si->ti", filter_coefs, X[0, :, :]) X_avg = X_avg.unsqueeze(0) # The perturbation is just an affine combination of the input and the previous tensor weighted by the mask X_pert = X_avg + masks_tensor * (X - X_avg) return X_pert class FadeReference(Perturbation): """This class allows to create and apply 'fade to reference' perturbation on inputs based on masks. Attributes: mask_tensor (torch.tensor): The mask tensor than indicates the intensity of the perturbation to be applied at each input entry. eps (float): Small number used for numerical stability. device: Device on which the tensor operations are executed. X_ref: The baseline input of same size as X. """ def __init__(self, device, X_ref, eps=1.0e-7): super().__init__(eps=eps, device=device) self.X_ref = X_ref def apply(self, X, mask_tensor): super().apply(X=X, mask_tensor=mask_tensor) # The perturbation is just an affine combination of the input and the baseline weighted by the mask X_pert = self.X_ref + mask_tensor * (X - self.X_ref) return X_pert def apply_extremal(self, X, mask_tensor): super().apply(X=X, mask_tensor=mask_tensor) # The perturbation is just an affine combination of the input and the baseline weighted by the mask X_pert = self.X_ref + mask_tensor * (X - self.X_ref) return X_pert