Source code for pyod.models.qmcd

# -*- coding: utf-8 -*-
"""Quasi-Monte Carlo Discrepancy outlier detection (QMCD)
"""

# Author: D Kulik
# License: BSD 2 clause

from __future__ import division
from __future__ import print_function

import numpy as np
from numba import njit, prange
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector


@njit(fastmath=True, parallel=True)
def _wrap_around_discrepancy(data):
    """Wrap-around Quasi-Monte Carlo discrepancy method"""

    n = data.shape[0]
    d = data.shape[1]

    disc = np.zeros(n)

    for i in prange(n):
        dc = 0.0
        for j in prange(n):
            prod = 1.0
            for k in prange(d):
                x_kikj = abs(data[i, k] - data[j, k])
                prod *= 3.0 / 2.0 - x_kikj + x_kikj ** 2

            dc += prod
        disc[i] = dc

    return - (4.0 / 3.0) ** d + 1.0 / (n ** 2) * disc


[docs] class QMCD(BaseDetector): """The Wrap-around Quasi-Monte Carlo discrepancy is a uniformity criterion which is used to assess the space filling of a number of samples in a hypercube. It quantifies the distance between the continuous uniform distribution on a hypercube and the discrete uniform distribution on distinct sample points. Therefore, lower discrepancy values for a sample point indicates that it provides better coverage of the parameter space with regard to the rest of the samples. This method is kernel based and a higher discrepancy score is relative to the rest of the samples, the higher the likelihood of it being an outlier. Read more in the :cite:`fang2001wrap`. Parameters ---------- Attributes ---------- decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data. The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The modified z-score to use as a threshold. Observations with a modified z-score (based on the median absolute deviation) greater than this value will be classified as outliers. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. """ def __init__(self, contamination=0.1): super(QMCD, self).__init__(contamination=contamination)
[docs] def fit(self, X, y=None): """Fit detector Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : Ignored Not used, present for API consistency by convention. """ # validate inputs X and y (optional) X = check_array(X) self._set_n_classes(y) # Normalize data between 0 and 1 scaler = MinMaxScaler() X_norm = scaler.fit_transform(X) X_norm = (X_norm / (X_norm.max(axis=0, keepdims=True) + np.spacing(0))) # Calculate WD QMCD scores scores = _wrap_around_discrepancy(X_norm) # Normalize scores between 0 and 1 scores = (scores - scores.min()) / (scores.max() - scores.min()) # Invert score order if majority is beyond 0.5 if len(scores[scores > 0.5]) > 0.5 * len(scores): scores = 1 - scores self.decision_scores_ = scores self._process_decision_scores() return self
[docs] def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector. The anomaly score of an input sample is computed based on different detector algorithms. For consistency, outliers are assigned with larger anomaly scores. Parameters ---------- X : numpy array of shape (n_samples, n_features) The independent and dependent/target samples with the target samples being the last column of the numpy array such that eg: X = np.append(x, y.reshape(-1,1), axis=1). Sparse matrices are accepted only if they are supported by the base estimator. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_']) X = check_array(X) # Normalize data between 0 and 1 scaler = MinMaxScaler() X_norm = scaler.fit_transform(X) X_norm = (X_norm / (X_norm.max(axis=0, keepdims=True) + np.spacing(0))) # Calculate WD QMCD scores scores = _wrap_around_discrepancy(X_norm) # Normalize scores between 0 and 1 scores = (scores - scores.min()) / (scores.max() - scores.min()) # Invert score order if majority is beyond 0.5 if len(scores[scores > 0.5]) > 0.5 * len(scores): scores = 1 - scores return scores