Source code for pyod.models.ae1svm

# -*- coding: utf-8 -*-
"""Using AE-1SVM with Outlier Detection (PyTorch)
   Source: https://arxiv.org/pdf/1804.04888
   There is another implementation of this model by Minh Nghia: https://github.com/minh-nghia/AE-1SVM (Tensorflow)
"""
# Author: Zhuo Xiao <zhuoxiao@usc.edu>

import numpy as np

try:
    import torch
except ImportError:
    print('please install torch first')

import torch
from torch import nn

from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ..utils.stat_models import pairwise_distances_no_broadcast
from ..utils.torch_utility import get_activation_by_name, TorchDataset


class InnerAE1SVM(nn.Module):
    """Internal model combining an Autoencoder and One-class SVM.

    Parameters
    ----------
    n_features : int
        Number of features in the input data.

    encoding_dim : int
        Dimension of the encoded representation.

    rff_dim : int
        Dimension of the random Fourier features.

    sigma : float, optional (default=1.0)
        Scaling factor for the random Fourier features.

    hidden_neurons : tuple of int, optional (default=(128, 64))
        Number of neurons in the hidden layers.

    dropout_rate : float, optional (default=0.2)
        Dropout rate for regularization.

    batch_norm : bool, optional (default=True)
        Whether to use batch normalization.

    hidden_activation : str, optional (default='relu')
        Activation function for hidden layers.
    """

    def __init__(self, n_features, encoding_dim, rff_dim, sigma=1.0,
                 hidden_neurons=(128, 64),
                 dropout_rate=0.2, batch_norm=True, hidden_activation='relu'):
        super(InnerAE1SVM, self).__init__()

        # Encoder: Sequential model consisting of linear, batch norm,
        # activation, and dropout layers.
        self.encoder = nn.Sequential()

        # Decoder: Sequential model to reconstruct the input from the
        # encoded representation.
        self.decoder = nn.Sequential()

        # Random Fourier Features layer for approximating the kernel function.
        self.rff = RandomFourierFeatures(encoding_dim, rff_dim, sigma)

        # Parameters for the SVM.
        self.svm_weights = nn.Parameter(torch.randn(rff_dim))
        self.svm_bias = nn.Parameter(torch.randn(1))

        # Activation function
        activation = get_activation_by_name(hidden_activation)
        layers_neurons_encoder = [n_features, *hidden_neurons, encoding_dim]

        # Build encoder
        for idx in range(len(layers_neurons_encoder) - 1):
            self.encoder.add_module(f"linear{idx}",
                                    nn.Linear(layers_neurons_encoder[idx],
                                              layers_neurons_encoder[idx + 1]))
            if batch_norm:
                self.encoder.add_module(f"batch_norm{idx}", nn.BatchNorm1d(
                    layers_neurons_encoder[idx + 1]))
            self.encoder.add_module(f"activation{idx}", activation)
            self.encoder.add_module(f"dropout{idx}", nn.Dropout(dropout_rate))

        layers_neurons_decoder = layers_neurons_encoder[::-1]

        # Build decoder
        for idx in range(len(layers_neurons_decoder) - 1):
            self.decoder.add_module(f"linear{idx}",
                                    nn.Linear(layers_neurons_decoder[idx],
                                              layers_neurons_decoder[idx + 1]))
            if batch_norm and idx < len(layers_neurons_decoder) - 2:
                self.decoder.add_module(f"batch_norm{idx}", nn.BatchNorm1d(
                    layers_neurons_decoder[idx + 1]))
            self.decoder.add_module(f"activation{idx}", activation)
            if idx < len(layers_neurons_decoder) - 2:
                self.decoder.add_module(f"dropout{idx}",
                                        nn.Dropout(dropout_rate))

    def forward(self, x):
        """Forward pass through the model.

        Parameters
        ----------
        x : torch.Tensor
            Input data.

        Returns
        -------
        tuple of torch. Tensor
            Reconstructed input and random Fourier features.
        """
        x = self.encoder(x)
        rff_features = self.rff(x)
        x = self.decoder(x)
        return x, rff_features

    def svm_decision_function(self, rff_features):
        """Compute the SVM decision function.

        Parameters
        ----------
        rff_features : torch.Tensor
            Random Fourier features.

        Returns
        -------
        torch.Tensor
            SVM decision scores.
        """
        return torch.matmul(rff_features, self.svm_weights) + self.svm_bias


class RandomFourierFeatures(nn.Module):
    """Layer for computing random Fourier features.

    Parameters
    ----------
    input_dim : int
        Dimension of the input data.

    output_dim : int
        Dimension of the output features.

    sigma : float, optional (default=1.0)
        Scaling factor for the random Fourier features.
    """

    def __init__(self, input_dim, output_dim, sigma=1.0):
        super(RandomFourierFeatures, self).__init__()
        self.weights = nn.Parameter(torch.randn(input_dim, output_dim) * sigma)
        self.bias = nn.Parameter(torch.randn(output_dim) * 2 * np.pi)

    def forward(self, x):
        """Forward pass to compute random Fourier features.

        Parameters
        ----------
        x : torch.Tensor
            Input data.

        Returns
        -------
        torch.Tensor
            Random Fourier features.
        """
        x = torch.matmul(x, self.weights) + self.bias
        return torch.cos(x)


[docs] class AE1SVM(BaseDetector): """Auto Encoder with One-class SVM for anomaly detection. Note: self.device is needed or all tensors may not be on the same device (if device w/ GPU running) Parameters ---------- hidden_neurons : list, optional (default=[64, 32]) Number of neurons in each hidden layer. hidden_activation : str, optional (default='relu') Activation function for the hidden layers. batch_norm : bool, optional (default=True) Whether to use batch normalization. learning_rate : float, optional (default=1e-3) Learning rate for training the model. epochs : int, optional (default=50) Number of training epochs. batch_size : int, optional (default=32) Size of each training batch. dropout_rate : float, optional (default=0.2) Dropout rate for regularization. weight_decay : float, optional (default=1e-5) Weight decay (L2 penalty) for the optimizer. preprocessing : bool, optional (default=True) Whether to apply standard scaling to the input data. loss_fn : callable, optional (default=torch.nn.MSELoss) Loss function to use for reconstruction loss. contamination : float, optional (default=0.1) Proportion of outliers in the data. alpha : float, optional (default=1.0) Weight for the reconstruction loss in the final loss computation. sigma : float, optional (default=1.0) Scaling factor for the random Fourier features. nu : float, optional (default=0.1) Parameter for the SVM loss. kernel_approx_features : int, optional (default=1000) Number of random Fourier features to approximate the kernel. """ def __init__(self, hidden_neurons=None, hidden_activation='relu', batch_norm=True, learning_rate=1e-3, epochs=50, batch_size=32, dropout_rate=0.2, weight_decay=1e-5, preprocessing=True, loss_fn=None, contamination=0.1, alpha=1.0, sigma=1.0, nu=0.1, kernel_approx_features=1000): super(AE1SVM, self).__init__(contamination=contamination) self.model = None self.decision_scores_ = None self.std = None self.mean = None self.hidden_neurons = hidden_neurons self.hidden_activation = hidden_activation self.batch_norm = batch_norm self.learning_rate = learning_rate self.epochs = epochs self.batch_size = batch_size self.dropout_rate = dropout_rate self.weight_decay = weight_decay self.preprocessing = preprocessing self.loss_fn = loss_fn or torch.nn.MSELoss() self.device = torch.device( "cuda" if torch.cuda.is_available() else "cpu") self.hidden_neurons = hidden_neurons or [64, 32] self.alpha = alpha self.sigma = sigma self.nu = nu self.kernel_approx_features = kernel_approx_features
[docs] def fit(self, X, y=None): """Fit the model to the data. Parameters ---------- X : numpy.ndarray Input data. y : None Ignored, present for API consistency by convention. Returns ------- self : object Fitted estimator. """ X = check_array(X) self._set_n_classes(y) n_samples, n_features = X.shape if self.preprocessing: self.mean, self.std = np.mean(X, axis=0), np.std(X, axis=0) self.std[self.std == 0] = 1e-6 # Avoid division by zero train_set = TorchDataset(X=X, mean=self.mean, std=self.std, return_idx=True) else: train_set = TorchDataset(X=X, return_idx=True) train_loader = torch.utils.data.DataLoader(train_set, batch_size=self.batch_size, shuffle=True) self.model = InnerAE1SVM(n_features=n_features, encoding_dim=32, rff_dim=self.kernel_approx_features, sigma=self.sigma, hidden_neurons=self.hidden_neurons, dropout_rate=self.dropout_rate, batch_norm=self.batch_norm, hidden_activation=self.hidden_activation) self.model = self.model.to(self.device) self._train_autoencoder(train_loader) if self.best_model_dict is not None: self.model.load_state_dict(self.best_model_dict) else: raise ValueError('Training failed, no valid model state found') if not isinstance(X, np.ndarray): X = np.array(X) self.decision_scores_ = self.decision_function(X) self._process_decision_scores() return self
def _train_autoencoder(self, train_loader): """Train the autoencoder. Parameters ---------- train_loader : torch.utils.data.DataLoader DataLoader for the training data. """ optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay) self.best_loss = float('inf') self.best_model_dict = None for epoch in range(self.epochs): overall_loss = [] for data, data_idx in train_loader: data = data.to(self.device).float() reconstructions, rff_features = self.model(data) recon_loss = self.loss_fn(data, reconstructions) svm_scores = self.model.svm_decision_function(rff_features) svm_loss = torch.mean(torch.clamp(1 - svm_scores, min=0)) loss = self.alpha * recon_loss + svm_loss self.model.zero_grad() loss.backward() optimizer.step() overall_loss.append(loss.item()) if (epoch + 1) % 10 == 0: print( f'Epoch {epoch + 1}/{self.epochs}, Loss: {np.mean(overall_loss)}') if np.mean(overall_loss) < self.best_loss: self.best_loss = np.mean(overall_loss) self.best_model_dict = self.model.state_dict()
[docs] def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector. Parameters ---------- X : numpy.ndarray The input samples. Returns ------- numpy.ndarray The anomaly score of the input samples. """ check_is_fitted(self, ['model', 'best_model_dict']) X = check_array(X) dataset = TorchDataset(X=X, mean=self.mean, std=self.std, return_idx=True) \ if self.preprocessing else (TorchDataset(X=X, return_idx=True)) dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size, shuffle=False) self.model.eval() outlier_scores = np.zeros([X.shape[0], ]) with torch.no_grad(): for data, data_idx in dataloader: data = data.to(self.device).float() reconstructions, rff_features = self.model(data) scores = pairwise_distances_no_broadcast(data.cpu().numpy(), reconstructions.cpu().numpy()) outlier_scores[data_idx] = scores return outlier_scores