Source code for pyod.models.devnet

# -*- coding: utf-8 -*-

"""Deep anomaly detection with deviation networks
Part of the codes are adapted from
https://github.com/GuansongPang/deviation-network
"""
# Author: Sihan Chen <schen976@usc.edu>
# License: BSD 2 clause


# Import necessary libraries
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.utils import check_array
from torch.utils.data import Dataset, DataLoader

from .base import BaseDetector
from ..utils.torch_utility import TorchDataset

MAX_INT = np.iinfo(np.int32).max
data_format = 0

# Set random seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)


# Define the network architectures
class DevNetD(nn.Module):
    def __init__(self, input_shape):
        super(DevNetD, self).__init__()
        self.fc1 = nn.Linear(input_shape, 1000)
        self.fc2 = nn.Linear(1000, 250)
        self.fc3 = nn.Linear(250, 20)
        self.score = nn.Linear(20, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.score(x)
        return x


class DevNetS(nn.Module):
    def __init__(self, input_shape):
        super(DevNetS, self).__init__()
        self.fc1 = nn.Linear(input_shape, 1000)
        self.score = nn.Linear(1000, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.score(x)
        return x


class DevNetLinear(nn.Module):
    def __init__(self, input_shape):
        super(DevNetLinear, self).__init__()
        self.score = nn.Linear(input_shape, 1)

    def forward(self, x):
        x = self.score(x)
        return x


def deviation_loss(y_true, y_pred):
    '''
    Z-score-based deviation loss translated to PyTorch.
    '''
    confidence_margin = 5.0
    # size=5000 is the setting of l in algorithm 1 in the paper
    ref = torch.randn(5000, device=y_pred.device,
                      dtype=torch.float32)  # Generate normal distributed ref values
    dev = (y_pred - ref.mean()) / ref.std()
    inlier_loss = torch.abs(dev)
    outlier_loss = torch.abs(torch.clamp(confidence_margin - dev, min=0))

    # Compute the mean of the weighted sum of inlier and outlier losses
    return torch.mean((1 - y_true) * inlier_loss + y_true * outlier_loss)


# Define the training and testing process
def train_and_test(model, train_loader, test_loader, epochs, device):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    model.train()

    for epoch in range(epochs):
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    model.eval()
    with torch.no_grad():
        total_loss = 0
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            total_loss += criterion(outputs, labels).item()
        print('Test Loss:', total_loss / len(test_loader))


# Main function to run the model
def deviation_network(input_shape, network_depth):
    '''
    Construct the deviation network-based detection model in PyTorch style
    '''
    # Select the model based on the network depth
    if network_depth == 4:
        model = DevNetD(input_shape)
    elif network_depth == 2:
        model = DevNetS(input_shape)
    elif network_depth == 1:
        model = DevNetLinear(input_shape)
    else:
        raise ValueError(
            "The network depth is not set properly")  # Use exception instead of sys.exit

    # Initialize the optimizer
    optimizer = optim.RMSprop(model.parameters(), lr=0.001,
                              weight_decay=1e-6)  # Set clipnorm equivalent in PyTorch
    return model, optimizer


class SupDataset(Dataset):
    def __init__(self, x, outlier_indices, inlier_indices, rng):
        self.x = x
        self.outlier_indices = outlier_indices
        self.inlier_indices = inlier_indices
        self.rng = np.random.RandomState(
            42)  # Ensure rng is seeded outside or fixed

    def __len__(self):
        return len(self.outlier_indices) + len(
            self.inlier_indices)  # or any other appropriate length

    def __getitem__(self, idx):
        if idx < len(self.inlier_indices):
            # Assuming inliers are processed first
            label = 0  # Assuming inlier label
            index = self.inlier_indices[idx]
        else:
            # Processing outliers
            label = 1  # Assuming outlier label
            index = self.outlier_indices[idx - len(self.inlier_indices)]

        return self.x[index], label


def input_batch_generation_sup_sparse(x_train, outlier_indices, inlier_indices,
                                      batch_size, rng):
    '''
    Batch generation for samples, alternating between positive and negative.
    Adjusted for use with PyTorch, handling data in tensors.
    '''
    training_data = []
    training_labels = []
    n_inliers = len(inlier_indices)
    n_outliers = len(outlier_indices)

    for i in range(batch_size):
        if i % 2 == 0:
            sid = rng.choice(n_inliers, 1)
            training_data.append(x_train[inlier_indices[sid.item()]])
            training_labels.append(0)
        else:
            sid = rng.choice(n_outliers, 1)
            training_data.append(x_train[outlier_indices[sid.item()]])
            training_labels.append(1)

    # Convert lists to tensors
    training_data = torch.stack(training_data)
    training_labels = torch.tensor(training_labels, dtype=torch.long)

    return training_data, training_labels


def load_model_weight_predict(model, x_test):
    # Ensure x_test is a PyTorch tensor and also ensure it's on the same device as the model
    x_test = torch.tensor(x_test, dtype=torch.float32)

    # Assuming data_format variable should be defined somewhere in the context or as a parameter
    data_format = 0  # Assuming it's set correctly according to your use-case

    if data_format == 0:
        scores = model(x_test)
    else:
        data_size = x_test.shape[0]
        scores = torch.zeros([data_size, ])
        batch_size = 512
        for i in range(0, data_size, batch_size):
            end = min(i + batch_size, data_size)
            subset = x_test[i:end]
            scores[i:end] = model(subset)

    # Make sure the output is flattened before returning
    scores = scores.flatten()  # Flatten the tensor to ensure it's one-dimensional

    return scores.detach().cpu().numpy()  # Convert to numpy array if needed


[docs] class DevNet(BaseDetector): def __init__(self, network_depth=2, batch_size=512, epochs=50, nb_batch=20, known_outliers=30, cont_rate=0.02, data_format=0, # Assuming '0' for CSV random_seed=42, device=None, contamination=0.1): super(DevNet, self).__init__(contamination=contamination) self._classes = 2 self.network_depth = network_depth self.batch_size = batch_size self.epochs = epochs self.nb_batch = nb_batch self.known_outliers = known_outliers self.cont_rate = cont_rate self.data_format = data_format self.random_seed = random_seed self.device = device if self.device is None: self.device = torch.device( "cuda:0" if torch.cuda.is_available() else "cpu")
[docs] def fit(self, X, y): outlier_indices = np.where(y == 1)[0] inlier_indices = np.where(y == 0)[0] n_outliers = len(outlier_indices) print("Original training size: %d, No. outliers: %d" % ( X.shape[0], n_outliers)) n_noise = len(np.where(y == 0)[0]) * self.contamination / ( 1. - self.contamination) n_noise = int(n_noise) outlier_indices = np.where(y == 1)[0] inlier_indices = np.where(y == 0)[0] print(y.shape[0], outlier_indices.shape[0], inlier_indices.shape[0], n_noise) # Data manipulation part can be adjusted as needed. self.model, optimizer = deviation_network(X.shape[1], self.network_depth) rng = np.random.RandomState(42) train_dataset = SupDataset(X, outlier_indices, inlier_indices, rng) train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) def train_model(model, data_loader, epochs): model.train() for epoch in range(epochs): for data, labels in data_loader: data, labels = data.to(torch.float32), labels.to( torch.float32) # Ensure data types optimizer.zero_grad() outputs = model(data) loss = deviation_loss(outputs, labels) loss.backward() optimizer.step() print(f'Epoch {epoch + 1}, Loss: {loss.item()}') # Training the model train_model(self.model, train_loader, epochs=self.epochs) self.decision_scores_ = self.decision_function(X) self._process_decision_scores() return self
[docs] def decision_function(self, X): X = check_array(X) dataset = TorchDataset(X=X, return_idx=True) dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size, shuffle=False) # enable the evaluation mode self.model.eval() # construct the vector for holding the reconstruction error outlier_scores = np.zeros([X.shape[0], ]) with torch.no_grad(): for data, data_idx in dataloader: data_cuda = data.to(self.device).float() # this is the outlier score outlier_scores[data_idx] = load_model_weight_predict( self.model, data) return outlier_scores
[docs] def fit_predict_score(self, X, y, scoring='roc_auc_score'): """ Fit the detector with labels, predict on samples, and evaluate the model by predefined metrics. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : numpy array of shape (n_samples,) The labels or target values corresponding to X. scoring : str, optional (default='roc_auc_score') Evaluation metric: - 'roc_auc_score': ROC score - 'prc_n_score': Precision @ rank n score Returns ------- score : float """ # Fit the model with both X and y self.fit(X, y) # Prediction and scoring if scoring == 'roc_auc_score': from sklearn.metrics import roc_auc_score score = roc_auc_score(y, self.decision_scores_) elif scoring == 'prc_n_score': from sklearn.metrics import precision_recall_curve precision, _, _ = precision_recall_curve(y, self.decision_scores_) score = precision[ 1] # Assuming this is how you'd compute Precision @ rank n else: raise NotImplementedError('PyOD built-in scoring only supports ' 'ROC and Precision @ rank n') print("{metric}: {score}".format(metric=scoring, score=score)) return score