Source code for pyod.models.mo_gaal

# -*- coding: utf-8 -*-
"""Multiple-Objective Generative Adversarial Active Learning.
Part of the codes are adapted from
https://github.com/leibinghe/GAAL-based-outlier-detection
"""
# Author: Zhuo Xiao <zhuoxiao@usc.edu>

from collections import defaultdict

import numpy as np

try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torch.utils.data import DataLoader, TensorDataset
except ImportError as e:
    raise ImportError(
        "PyTorch is required for GAAL models. Please install it with "
        "`pip install pyod[torch]` or `pip install torch`."
    ) from e

from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from .gaal_base import create_discriminator, create_generator


[docs] class MO_GAAL(BaseDetector): """Multi-Objective Generative Adversarial Active Learning. MO_GAAL directly generates informative potential outliers to assist the classifier in describing a boundary that can separate outliers from normal data effectively. Moreover, to prevent the generator from falling into the mode collapsing problem, the network structure of SO-GAAL is expanded from a single generator (SO-GAAL) to multiple generators with different objectives (MO-GAAL) to generate a reasonable reference distribution for the whole dataset. Read more in the :cite:`liu2019generative`. Parameters ---------- contamination : float in (0., 0.5), optional (default=0.1) The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. k : int, optional (default=10) The number of sub generators. stop_epochs : int, optional (default=20) The number of epochs of training. The number of total epochs equals to three times of stop_epochs. lr_d : float, optional (default=0.01) The learn rate of the discriminator. lr_g : float, optional (default=0.0001) The learn rate of the generator. momentum : float, optional (default=0.9) The momentum parameter for SGD. Attributes ---------- decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data. The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The threshold is based on ``contamination``. It is the ``n_samples * contamination`` most abnormal samples in ``decision_scores_``. The threshold is calculated for generating binary outlier labels. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. """ def __init__(self, k=10, stop_epochs=20, lr_d=0.01, lr_g=0.0001, momentum=0.9, contamination=0.1): super(MO_GAAL, self).__init__(contamination=contamination) self.k = k self.stop_epochs = stop_epochs self.lr_d = lr_d self.lr_g = lr_g self.momentum = momentum self.device = torch.device( "cuda" if torch.cuda.is_available() else "cpu") self.train_history = defaultdict(list)
[docs] def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : Ignored Not used, present for API consistency by convention. Returns ------- self : object Fitted estimator. """ X = check_array(X) self._set_n_classes(y) self.train_history = defaultdict(list) names = locals() epochs = self.stop_epochs * 3 latent_size = X.shape[1] data_size = X.shape[0] # Create discriminator self.discriminator = create_discriminator(latent_size, data_size).to( self.device) optimizer_d = optim.SGD(self.discriminator.parameters(), lr=self.lr_d, momentum=self.momentum) criterion = nn.BCELoss() # Create k generators for i in range(self.k): generator_name = 'sub_generator' + str(i) generator = create_generator(latent_size).to(self.device) names[generator_name] = generator # Define the optimizer for the generator optimizer_name = 'optimizer_g' + str(i) optimizer_g = optim.SGD(generator.parameters(), lr=self.lr_g, momentum=self.momentum) names[optimizer_name] = optimizer_g dataloader = DataLoader(TensorDataset( torch.tensor(X, dtype=torch.float32).to(self.device)), batch_size=min(500, data_size), shuffle=True) stop = 0 # Start iteration for epoch in range(epochs): print('Epoch {} of {}'.format(epoch + 1, epochs)) for batch_idx, data_batch in enumerate(dataloader): # print(f'\nTesting for epoch {epoch + 1} index {batch_idx + 1}:') data_batch = data_batch[0].to(self.device) batch_size = data_batch.size(0) # Generate noise noise = torch.rand(batch_size, latent_size, device=self.device) # Generate potential outliers block = ((1 + self.k) * self.k) // 2 for i in range(self.k): if i != (self.k - 1): noise_start = int( (((self.k + (self.k - i + 1)) * i) / 2) * ( batch_size // block)) noise_end = int( (((self.k + (self.k - i)) * (i + 1)) / 2) * ( batch_size // block)) names['noise' + str(i)] = noise[noise_start:noise_end] else: noise_start = int( (((self.k + (self.k - i + 1)) * i) / 2) * ( batch_size // block)) names['noise' + str(i)] = noise[noise_start:batch_size] names['generated_data' + str(i)] = names[ 'sub_generator' + str(i)](names['noise' + str(i)]) # Concatenate real data to generated data all_data = torch.cat( [data_batch] + [names['generated_data' + str(i)] for i in range(self.k)], dim=0) labels = torch.cat( [torch.ones(batch_size, 1, device=self.device), torch.zeros( sum([d.size(0) for d in [names['generated_data' + str(i)] for i in range(self.k)]]), 1, device=self.device)], dim=0) # Ensure outputs and labels are the same size assert all_data.size(0) == labels.size( 0), "Mismatch between all_data and labels sizes" # Train discriminator self.discriminator.train() self.discriminator.zero_grad() outputs = self.discriminator(all_data) outputs = outputs.view(-1, 1) # Ensure outputs shape matches labels shape discriminator_loss = criterion(outputs, labels) discriminator_loss.backward() optimizer_d.step() self.train_history['discriminator_loss'].append( discriminator_loss.item()) # Get the target value of sub-generators with torch.no_grad(): pred_scores = self.discriminator( torch.tensor(X, dtype=torch.float32, device=self.device)).cpu().numpy().ravel() for i in range(self.k): names['T' + str(i)] = np.percentile(pred_scores, i / self.k * 100) names['trick' + str(i)] = torch.tensor( [float(names['T' + str(i)])] * names[ 'noise' + str(i)].size(0), device=self.device).unsqueeze(1) # Train generators if stop == 0: for i in range(self.k): names['optimizer_g' + str(i)].zero_grad() fake_data = names['sub_generator' + str(i)]( names['noise' + str(i)]) fake_outputs = self.discriminator(fake_data) generator_loss = criterion(fake_outputs, names['trick' + str(i)]) generator_loss.backward() names['optimizer_g' + str(i)].step() names['sub_generator' + str( i) + '_loss'] = generator_loss.item() self.train_history[f'sub_generator{i}_loss'].append( generator_loss.item()) else: for i in range(self.k): with torch.no_grad(): fake_data = names['sub_generator' + str(i)]( names['noise' + str(i)]) fake_outputs = self.discriminator(fake_data) generator_loss = criterion(fake_outputs, names['trick' + str(i)]) names['sub_generator' + str( i) + '_loss'] = generator_loss.item() self.train_history[ f'sub_generator{i}_loss'].append( generator_loss.item()) generator_loss = np.mean( [names['sub_generator' + str(i) + '_loss'] for i in range(self.k)]) self.train_history['generator_loss'].append(generator_loss) if epoch + 1 > self.stop_epochs: stop = 1 # Detection result decision_scores = self.discriminator( torch.tensor(X, dtype=torch.float32, device=self.device)).cpu().detach().numpy() self.decision_scores_ = decision_scores.ravel() self._process_decision_scores() return self
[docs] def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector. The anomaly score of an input sample is computed based on different detector algorithms. For consistency, outliers are assigned with larger anomaly scores. Parameters ---------- X : numpy array of shape (n_samples, n_features) The training input samples. Sparse matrices are accepted only if they are supported by the base estimator. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ check_is_fitted(self, ['discriminator']) X = check_array(X) pred_scores = self.discriminator( torch.tensor(X, dtype=torch.float32).to( self.device)).cpu().detach().numpy().ravel() return pred_scores