aiops/ContraLSP/abstudy/gatemasknn_no_both.py [12:170]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
warnings.filterwarnings("ignore")


class MovingAvg(nn.Module):
    """
    Moving average block to highlight the trend of time series
    """
    def __init__(self, kernel_size=20, stride=1):
        super(MovingAvg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
        # padding on the both ends of time series
        front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = th.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1))
        x = x.permute(0, 2, 1)
        return x


class GateMaskNN(nn.Module):
    """
    Extremal Mask NN model.

    Args:
        forward_func (callable): The forward function of the model or any
            modification of it.
        model (nnn.Module): A model used to recreate the original
            predictions, in addition to the mask. Default to ``None``
        batch_size (int): Batch size of the model. Default to 32
        factor_dilation (float): Ratio between the final and the
            initial size regulation factor. Default to 100
    References
        #. `Learning Perturbations to Explain Time Series Predictions <https://arxiv.org/abs/2305.18840>`_
        #. `Understanding Deep Networks via Extremal Perturbations and Smooth Masks <https://arxiv.org/abs/1910.08485>`_
    """

    def __init__(
        self,
        forward_func: Callable,
        model: nn.Module = None,
        batch_size: int = 32,
        factor_dilation: float = 10.0,
        based: float = 0.5,
        pooling_method: str = 'sigmoid',
        use_win: bool = False,
    ) -> None:
        super().__init__()
        object.__setattr__(self, "forward_func", forward_func)
        self.model = model
        self.batch_size = batch_size

        self.input_size = None
        self.win_size = None
        self.sigma = None
        self.channels = None
        self.T = None
        self.reg_multiplier = None
        self.mask = None
        self.based = based
        self.factor_dilation = factor_dilation
        self.pooling_method = pooling_method
        self.use_win = use_win

    def init(self, input_size: tuple, batch_size: int = 32,
             win_size: int = 5, sigma: float = 0.5, n_epochs: float = 100) -> None:
        self.input_size = input_size
        self.batch_size = batch_size
        self.win_size = win_size
        self.sigma = sigma
        self.channels = input_size[2]
        self.T = input_size[1]
        self.reg_multiplier = np.exp(
            np.log(self.factor_dilation) / n_epochs
        )

        self.moving_avg = MovingAvg(self.win_size)

        self.mask = nn.Parameter(th.Tensor(*input_size))

        self.trendnet = nn.ModuleList()
        for i in range(self.channels):
            self.trendnet.append(MLP([self.T, 32, self.T], activations='relu'))

        self.reset_parameters()

    def hard_sigmoid(self, x):
        return th.clamp(x, 0.0, 1.0)

    def reset_parameters(self) -> None:
        self.mask.data.fill_(0.5)
        # In the first training step, µd is 0.0
        # self.mask.data.fill_(0.0)

    def forward(
        self,
        x: th.Tensor,
        batch_idx,
        baselines,
        target,
        *additional_forward_args,
    ) -> (th.Tensor, th.Tensor):

        mu = self.mask[
            self.batch_size * batch_idx : self.batch_size * (batch_idx + 1)
        ]
        noise = th.randn(x.shape)
        mask = mu + self.sigma * noise.normal_() * self.training
        mask = self.refactor_mask(mask, x)

        # hard sigmoid
        mask = self.hard_sigmoid(mask)

        # If model is provided, we use it as the baselines
        if self.model is not None:
            baselines = self.model(x - baselines)

        # Mask data according to samples
        # We eventually cut samples up to x time dimension
        # x1 represents inputs with important features masked.
        # x2 represents inputs with unimportant features masked.
        mask = mask[:, : x.shape[1], ...]
        x1 = x * mask + baselines * (1.0 - mask)
        x2 = x * (1.0 - mask) + baselines * mask

        # Return f(perturbed x)
        return (
            _run_forward(
                forward_func=self.forward_func,
                inputs=x1,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
            _run_forward(
                forward_func=self.forward_func,
                inputs=x2,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
        )

    def trend_info(self, x):
        if self.use_win:
            trend = self.moving_avg(x)
        else:
            trend = x
        trend_out = th.zeros(trend.shape,
                             dtype=trend.dtype).to(trend.device)
        for i in range(self.channels):
            trend_out[:, :, i] = self.trendnet[i](trend[:, :, i])

        # front = x[:, 0:1, :]
        # x_f = th.cat([front, x], dim=1)[:, :-1, :]
        # res = th.abs(x - x_f)
        return trend_out

    def refactor_mask(self, mask, x):
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



aiops/ContraLSP/abstudy/gatemasknn_no_counterf.py [12:170]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
warnings.filterwarnings("ignore")


class MovingAvg(nn.Module):
    """
    Moving average block to highlight the trend of time series
    """
    def __init__(self, kernel_size=20, stride=1):
        super(MovingAvg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
        # padding on the both ends of time series
        front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = th.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1))
        x = x.permute(0, 2, 1)
        return x


class GateMaskNN(nn.Module):
    """
    Extremal Mask NN model.

    Args:
        forward_func (callable): The forward function of the model or any
            modification of it.
        model (nnn.Module): A model used to recreate the original
            predictions, in addition to the mask. Default to ``None``
        batch_size (int): Batch size of the model. Default to 32
        factor_dilation (float): Ratio between the final and the
            initial size regulation factor. Default to 100
    References
        #. `Learning Perturbations to Explain Time Series Predictions <https://arxiv.org/abs/2305.18840>`_
        #. `Understanding Deep Networks via Extremal Perturbations and Smooth Masks <https://arxiv.org/abs/1910.08485>`_
    """

    def __init__(
        self,
        forward_func: Callable,
        model: nn.Module = None,
        batch_size: int = 32,
        factor_dilation: float = 10.0,
        based: float = 0.5,
        pooling_method: str = 'sigmoid',
        use_win: bool = False,
    ) -> None:
        super().__init__()
        object.__setattr__(self, "forward_func", forward_func)
        self.model = model
        self.batch_size = batch_size

        self.input_size = None
        self.win_size = None
        self.sigma = None
        self.channels = None
        self.T = None
        self.reg_multiplier = None
        self.mask = None
        self.based = based
        self.factor_dilation = factor_dilation
        self.pooling_method = pooling_method
        self.use_win = use_win

    def init(self, input_size: tuple, batch_size: int = 32,
             win_size: int = 5, sigma: float = 0.5, n_epochs: float = 100) -> None:
        self.input_size = input_size
        self.batch_size = batch_size
        self.win_size = win_size
        self.sigma = sigma
        self.channels = input_size[2]
        self.T = input_size[1]
        self.reg_multiplier = np.exp(
            np.log(self.factor_dilation) / n_epochs
        )

        self.moving_avg = MovingAvg(self.win_size)

        self.mask = nn.Parameter(th.Tensor(*input_size))

        self.trendnet = nn.ModuleList()
        for i in range(self.channels):
            self.trendnet.append(MLP([self.T, 32, self.T], activations='relu'))

        self.reset_parameters()

    def hard_sigmoid(self, x):
        return th.clamp(x, 0.0, 1.0)

    def reset_parameters(self) -> None:
        self.mask.data.fill_(0.5)
        # In the first training step, µd is 0.0
        # self.mask.data.fill_(0.0)

    def forward(
        self,
        x: th.Tensor,
        batch_idx,
        baselines,
        target,
        *additional_forward_args,
    ) -> (th.Tensor, th.Tensor):

        mu = self.mask[
            self.batch_size * batch_idx : self.batch_size * (batch_idx + 1)
        ]
        noise = th.randn(x.shape)
        mask = mu + self.sigma * noise.normal_() * self.training
        mask = self.refactor_mask(mask, x)

        # hard sigmoid
        mask = self.hard_sigmoid(mask)

        # If model is provided, we use it as the baselines
        if self.model is not None:
            baselines = self.model(x - baselines)

        # Mask data according to samples
        # We eventually cut samples up to x time dimension
        # x1 represents inputs with important features masked.
        # x2 represents inputs with unimportant features masked.
        mask = mask[:, : x.shape[1], ...]
        x1 = x * mask + baselines * (1.0 - mask)
        x2 = x * (1.0 - mask) + baselines * mask

        # Return f(perturbed x)
        return (
            _run_forward(
                forward_func=self.forward_func,
                inputs=x1,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
            _run_forward(
                forward_func=self.forward_func,
                inputs=x2,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
        )

    def trend_info(self, x):
        if self.use_win:
            trend = self.moving_avg(x)
        else:
            trend = x
        trend_out = th.zeros(trend.shape,
                             dtype=trend.dtype).to(trend.device)
        for i in range(self.channels):
            trend_out[:, :, i] = self.trendnet[i](trend[:, :, i])

        # front = x[:, 0:1, :]
        # x_f = th.cat([front, x], dim=1)[:, :-1, :]
        # res = th.abs(x - x_f)
        return trend_out

    def refactor_mask(self, mask, x):
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



