Source code for pyod.models.auto_encoder

# -*- coding: utf-8 -*-
"""Using AutoEncoder with Outlier Detection
"""
# Author: Tiankai Yang <tiankaiy@usc.edu>
# License: BSD 2 clause


try:
    import torch
except ImportError:
    print('please install torch first')

from torch import nn

from pyod.models.base_dl import BaseDeepLearningDetector
from pyod.utils.stat_models import pairwise_distances_no_broadcast
from pyod.utils.torch_utility import LinearBlock


[docs] class AutoEncoder(BaseDeepLearningDetector): """ Auto Encoder (AE) is a type of neural networks for learning useful data representations in an unsupervised manner. Similar to PCA, AE could be used to detect outlying objects in the data by calculating the reconstruction errors. See :cite:`aggarwal2015outlier` Chapter 3 for details. Parameters ---------- contamination : float in (0., 0.5), optional (default=0.1) The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. preprocessing : bool, optional (default=True) If True, apply the preprocessing procedure before training models. lr : float, optional (default=1e-3) The initial learning rate for the optimizer. epoch_num : int, optional (default=10) The number of epochs for training. batch_size : int, optional (default=32) The batch size for training. optimizer_name : str, optional (default='adam') The name of theoptimizer used to train the model. device : str, optional (default=None) The device to use for the model. If None, it will be decided automatically. If you want to use MPS, set it to 'mps'. random_state : int, optional (default=42) The random seed for reproducibility. use_compile : bool, optional (default=False) Whether to compile the model. If True, the model will be compiled before training. This is only available for PyTorch version >= 2.0.0. and Python < 3.12. compile_mode : str, optional (default='default') The mode to compile the model. Can be either “default”, “reduce-overhead”, “max-autotune” or “max-autotune-no-cudagraphs”. See https://pytorch.org/docs/stable/generated/torch.compile.html#torch-compile for details. verbose : int, optional (default=1) Verbosity mode. - 0 = silent - 1 = progress bar - 2 = one line per epoch. optimizer_params : dict, optional (default={'weight_decay': 1e-5}) Additional parameters for the optimizer. For example, `optimizer_params={'weight_decay': 1e-5}`. hidden_neuron_list : list, optional (default=[64, 32]) The number of neurons per hidden layers. So the network has the structure as [feature_size, 64, 32, 32, 64, feature_size]. hidden_activation_name : str, optional (default='relu') The activation function used in hidden layers. batch_norm : boolean, optional (default=True) Whether to apply Batch Normalization, See https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm1d.html dropout_rate : float in (0., 1), optional (default=0.2) The dropout to be used across all layers. Attributes ---------- model : torch.nn.Module The underlying AutoEncoder model. optimizer : torch.optim The optimizer used to train the model. criterion : torch.nn.modules The loss function used to train the model. decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data. The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The threshold is based on ``contamination``. It is the ``n_samples * contamination`` most abnormal samples in ``decision_scores_``. The threshold is calculated for generating binary outlier labels. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. """ def __init__(self, contamination=0.1, preprocessing=True, lr=1e-3, epoch_num=10, batch_size=32, optimizer_name='adam', device=None, random_state=42, use_compile=False, compile_mode='default', verbose=1, optimizer_params: dict = {'weight_decay': 1e-5}, hidden_neuron_list=[64, 32], hidden_activation_name='relu', batch_norm=True, dropout_rate=0.2): super(AutoEncoder, self).__init__(contamination=contamination, preprocessing=preprocessing, lr=lr, epoch_num=epoch_num, batch_size=batch_size, optimizer_name=optimizer_name, criterion_name='mse', device=device, random_state=random_state, use_compile=use_compile, compile_mode=compile_mode, verbose=verbose, optimizer_params=optimizer_params) self.hidden_neuron_list = hidden_neuron_list self.hidden_activation_name = hidden_activation_name self.batch_norm = batch_norm self.dropout_rate = dropout_rate
[docs] def build_model(self): self.model = AutoEncoderModel( self.feature_size, hidden_neuron_list=self.hidden_neuron_list, hidden_activation_name=self.hidden_activation_name, batch_norm=self.batch_norm, dropout_rate=self.dropout_rate)
[docs] def training_forward(self, batch_data): x = batch_data x = x.to(self.device) self.optimizer.zero_grad() x_recon = self.model(x) loss = self.criterion(x_recon, x) loss.backward() self.optimizer.step() return loss.item()
def evaluating_forward(self, batch_data): x = batch_data x_gpu = x.to(self.device) x_recon = self.model(x_gpu) score = pairwise_distances_no_broadcast(x.numpy(), x_recon.cpu().numpy()) return score
class AutoEncoderModel(nn.Module): def __init__(self, feature_size, hidden_neuron_list=[64, 32], hidden_activation_name='relu', batch_norm=True, dropout_rate=0.2): super(AutoEncoderModel, self).__init__() self.feature_size = feature_size self.hidden_neuron_list = hidden_neuron_list self.hidden_activation_name = hidden_activation_name self.batch_norm = batch_norm self.dropout_rate = dropout_rate self.encoder = self._build_encoder() self.decoder = self._build_decoder() def _build_encoder(self): encoder_layers = [] last_neuron_size = self.feature_size for neuron_size in self.hidden_neuron_list: encoder_layers.append(LinearBlock( last_neuron_size, neuron_size, activation_name=self.hidden_activation_name, batch_norm=self.batch_norm, dropout_rate=self.dropout_rate)) last_neuron_size = neuron_size return nn.Sequential(*encoder_layers) def _build_decoder(self): decoder_layers = [] last_neuron_size = self.hidden_neuron_list[-1] for neuron_size in reversed(self.hidden_neuron_list[:-1]): decoder_layers.append(LinearBlock( last_neuron_size, neuron_size, activation_name=self.hidden_activation_name, batch_norm=self.batch_norm, dropout_rate=self.dropout_rate)) last_neuron_size = neuron_size decoder_layers.append(nn.Linear(last_neuron_size, self.feature_size)) return nn.Sequential(*decoder_layers) def forward(self, x): x = self.encoder(x) x = self.decoder(x) return x