# -*- coding: utf-8 -*-
"""Quasi-Monte Carlo Discrepancy outlier detection (QMCD)
"""
# Author: D Kulik
# License: BSD 2 clause
from __future__ import division
from __future__ import print_function
import numpy as np
from numba import njit, prange
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
@njit(fastmath=True, parallel=True)
def _wrap_around_discrepancy(data):
"""Wrap-around Quasi-Monte Carlo discrepancy method"""
n = data.shape[0]
d = data.shape[1]
disc = np.zeros(n)
for i in prange(n):
dc = 0.0
for j in prange(n):
prod = 1.0
for k in prange(d):
x_kikj = abs(data[i, k] - data[j, k])
prod *= 3.0 / 2.0 - x_kikj + x_kikj ** 2
dc += prod
disc[i] = dc
return - (4.0 / 3.0) ** d + 1.0 / (n ** 2) * disc
[docs]
class QMCD(BaseDetector):
"""The Wrap-around Quasi-Monte Carlo discrepancy is a uniformity criterion
which is used to assess the space filling of a number of samples in a hypercube.
It quantifies the distance between the continuous uniform distribution on a
hypercube and the discrete uniform distribution on distinct sample points.
Therefore, lower discrepancy values for a sample point indicates that it provides
better coverage of the parameter space with regard to the rest of the samples. This
method is kernel based and a higher discrepancy score is relative to the
rest of the samples, the higher the likelihood of it being an outlier.
Read more in the :cite:`fang2001wrap`.
Parameters
----------
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
threshold_ : float
The modified z-score to use as a threshold. Observations with
a modified z-score (based on the median absolute deviation) greater
than this value will be classified as outliers.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, contamination=0.1):
super(QMCD, self).__init__(contamination=contamination)
[docs]
def fit(self, X, y=None):
"""Fit detector
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Not used, present for API consistency by convention.
"""
# validate inputs X and y (optional)
X = check_array(X)
self._set_n_classes(y)
# Normalize data between 0 and 1
scaler = MinMaxScaler()
X_norm = scaler.fit_transform(X)
X_norm = (X_norm / (X_norm.max(axis=0, keepdims=True)
+ np.spacing(0)))
# Calculate WD QMCD scores
scores = _wrap_around_discrepancy(X_norm)
# Normalize scores between 0 and 1
scores = (scores - scores.min()) / (scores.max() - scores.min())
# Invert score order if majority is beyond 0.5
if len(scores[scores > 0.5]) > 0.5 * len(scores):
scores = 1 - scores
self.decision_scores_ = scores
self._process_decision_scores()
return self
[docs]
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.
The anomaly score of an input sample is computed based on different
detector algorithms. For consistency, outliers are assigned with
larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The independent and dependent/target samples with the target
samples being the last column of the numpy array such that
eg: X = np.append(x, y.reshape(-1,1), axis=1). Sparse matrices are
accepted only if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
# Normalize data between 0 and 1
scaler = MinMaxScaler()
X_norm = scaler.fit_transform(X)
X_norm = (X_norm / (X_norm.max(axis=0, keepdims=True)
+ np.spacing(0)))
# Calculate WD QMCD scores
scores = _wrap_around_discrepancy(X_norm)
# Normalize scores between 0 and 1
scores = (scores - scores.min()) / (scores.max() - scores.min())
# Invert score order if majority is beyond 0.5
if len(scores[scores > 0.5]) > 0.5 * len(scores):
scores = 1 - scores
return scores