aiops/ContraLSP/abstudy/gatemasknn_no_smooth.py [59:170]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        use_win: bool = False,
    ) -> None:
        super().__init__()
        object.__setattr__(self, "forward_func", forward_func)
        self.model = model
        self.batch_size = batch_size

        self.input_size = None
        self.win_size = None
        self.sigma = None
        self.channels = None
        self.T = None
        self.reg_multiplier = None
        self.mask = None
        self.based = based
        self.factor_dilation = factor_dilation
        self.pooling_method = pooling_method
        self.use_win = use_win

    def init(self, input_size: tuple, batch_size: int = 32,
             win_size: int = 5, sigma: float = 0.5, n_epochs: float = 100) -> None:
        self.input_size = input_size
        self.batch_size = batch_size
        self.win_size = win_size
        self.sigma = sigma
        self.channels = input_size[2]
        self.T = input_size[1]
        self.reg_multiplier = np.exp(
            np.log(self.factor_dilation) / n_epochs
        )

        self.moving_avg = MovingAvg(self.win_size)

        self.mask = nn.Parameter(th.Tensor(*input_size))

        self.trendnet = nn.ModuleList()
        for i in range(self.channels):
            self.trendnet.append(MLP([self.T, 32, self.T], activations='relu'))

        self.reset_parameters()

    def hard_sigmoid(self, x):
        return th.clamp(x, 0.0, 1.0)

    def reset_parameters(self) -> None:
        self.mask.data.fill_(0.5)
        # In the first training step, µd is 0.0
        # self.mask.data.fill_(0.0)

    def forward(
        self,
        x: th.Tensor,
        batch_idx,
        baselines,
        target,
        *additional_forward_args,
    ) -> (th.Tensor, th.Tensor):

        mu = self.mask[
            self.batch_size * batch_idx : self.batch_size * (batch_idx + 1)
        ]
        noise = th.randn(x.shape)
        mask = mu + self.sigma * noise.normal_() * self.training
        mask = self.refactor_mask(mask, x)

        # hard sigmoid
        mask = self.hard_sigmoid(mask)

        # If model is provided, we use it as the baselines
        if self.model is not None:
            baselines = self.model(x - baselines)

        # Mask data according to samples
        # We eventually cut samples up to x time dimension
        # x1 represents inputs with important features masked.
        # x2 represents inputs with unimportant features masked.
        mask = mask[:, : x.shape[1], ...]
        x1 = x * mask + baselines * (1.0 - mask)
        x2 = x * (1.0 - mask) + baselines * mask

        # Return f(perturbed x)
        return (
            _run_forward(
                forward_func=self.forward_func,
                inputs=x1,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
            _run_forward(
                forward_func=self.forward_func,
                inputs=x2,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
        )

    def trend_info(self, x):
        if self.use_win:
            trend = self.moving_avg(x)
        else:
            trend = x
        trend_out = th.zeros(trend.shape,
                             dtype=trend.dtype).to(trend.device)
        for i in range(self.channels):
            trend_out[:, :, i] = self.trendnet[i](trend[:, :, i])

        # front = x[:, 0:1, :]
        # x_f = th.cat([front, x], dim=1)[:, :-1, :]
        # res = th.abs(x - x_f)
        return trend_out

    def refactor_mask(self, mask, x):
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



aiops/ContraLSP/attribution/gatemasknn.py [59:170]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        use_win: bool = False,
    ) -> None:
        super().__init__()
        object.__setattr__(self, "forward_func", forward_func)
        self.model = model
        self.batch_size = batch_size

        self.input_size = None
        self.win_size = None
        self.sigma = None
        self.channels = None
        self.T = None
        self.reg_multiplier = None
        self.mask = None
        self.based = based
        self.factor_dilation = factor_dilation
        self.pooling_method = pooling_method
        self.use_win = use_win

    def init(self, input_size: tuple, batch_size: int = 32,
             win_size: int = 5, sigma: float = 0.5, n_epochs: float = 100) -> None:
        self.input_size = input_size
        self.batch_size = batch_size
        self.win_size = win_size
        self.sigma = sigma
        self.channels = input_size[2]
        self.T = input_size[1]
        self.reg_multiplier = np.exp(
            np.log(self.factor_dilation) / n_epochs
        )

        self.moving_avg = MovingAvg(self.win_size)

        self.mask = nn.Parameter(th.Tensor(*input_size))

        self.trendnet = nn.ModuleList()
        for i in range(self.channels):
            self.trendnet.append(MLP([self.T, 32, self.T], activations='relu'))

        self.reset_parameters()

    def hard_sigmoid(self, x):
        return th.clamp(x, 0.0, 1.0)

    def reset_parameters(self) -> None:
        self.mask.data.fill_(0.5)
        # In the first training step, µd is 0.0
        # self.mask.data.fill_(0.0)

    def forward(
        self,
        x: th.Tensor,
        batch_idx,
        baselines,
        target,
        *additional_forward_args,
    ) -> (th.Tensor, th.Tensor):

        mu = self.mask[
            self.batch_size * batch_idx : self.batch_size * (batch_idx + 1)
        ]
        noise = th.randn(x.shape)
        mask = mu + self.sigma * noise.normal_() * self.training
        mask = self.refactor_mask(mask, x)

        # hard sigmoid
        mask = self.hard_sigmoid(mask)

        # If model is provided, we use it as the baselines
        if self.model is not None:
            baselines = self.model(x - baselines)

        # Mask data according to samples
        # We eventually cut samples up to x time dimension
        # x1 represents inputs with important features masked.
        # x2 represents inputs with unimportant features masked.
        mask = mask[:, : x.shape[1], ...]
        x1 = x * mask + baselines * (1.0 - mask)
        x2 = x * (1.0 - mask) + baselines * mask

        # Return f(perturbed x)
        return (
            _run_forward(
                forward_func=self.forward_func,
                inputs=x1,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
            _run_forward(
                forward_func=self.forward_func,
                inputs=x2,
                target=target,
                additional_forward_args=additional_forward_args,
            ),
        )

    def trend_info(self, x):
        if self.use_win:
            trend = self.moving_avg(x)
        else:
            trend = x
        trend_out = th.zeros(trend.shape,
                             dtype=trend.dtype).to(trend.device)
        for i in range(self.channels):
            trend_out[:, :, i] = self.trendnet[i](trend[:, :, i])

        # front = x[:, 0:1, :]
        # x_f = th.cat([front, x], dim=1)[:, :-1, :]
        # res = th.abs(x - x_f)
        return trend_out

    def refactor_mask(self, mask, x):
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



