Source code for pyod.models.lmdd

# -*- coding: utf-8 -*-
"""Linear Model Deviation-base outlier detection (LMDD).
"""
# Author: Yahya Almardeny <almardeny@gmail.com>
# License: BSD 2 clause

from __future__ import division
from __future__ import print_function

import numpy as np
from numba import njit
from scipy import stats
from sklearn.utils import check_array, check_random_state

from pyod.utils import check_parameter
from .base import BaseDetector


@njit
def _aad(X):
    """Internal Function to Calculate Average Absolute Deviation
    (a.k.a Mean Absolute Deviation)
    """
    return np.mean(np.absolute(X - np.mean(X)))


def _check_params(n_iter, dis_measure, random_state):
    """Internal function to check for and validate class parameters.
    Also, to return random state instance and the appropriate dissimilarity
    measure if valid.
    """
    if isinstance(n_iter, int):
        check_parameter(n_iter, low=1, param_name='n_iter')
    else:
        raise TypeError("n_iter should be int, got %s" % n_iter)

    if isinstance(dis_measure, str):
        if dis_measure not in ('aad', 'var', 'iqr'):
            raise ValueError("Unknown dissimilarity measure type, "
                             "dis_measure should be in "
                             "(\'aad\', \'var\', \'iqr\'), "
                             "got %s" % dis_measure)
        # TO-DO: 'mad': Median Absolute Deviation to be added
        # once Scipy stats version 1.3.0 is released
    else:
        raise TypeError("dis_measure should be str, got %s" % dis_measure)

    return check_random_state(random_state), _aad if dis_measure == 'aad' \
        else (np.var if dis_measure == 'var'
              else (stats.iqr if dis_measure == 'iqr' else None))


[docs]class LMDD(BaseDetector): """Linear Method for Deviation-based Outlier Detection. LMDD employs the concept of the smoothing factor which indicates how much the dissimilarity can be reduced by removing a subset of elements from the data-set. Read more in the :cite:`arning1996linear`. Note: this implementation has minor modification to make it output scores instead of labels. Parameters ---------- contamination : float in (0., 0.5), optional (default=0.1) The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. n_iter : int, optional (default=50) Number of iterations where in each iteration, the process is repeated after randomizing the order of the input. Note that n_iter is a very important factor that affects the accuracy. The higher the better the accuracy and the longer the execution. dis_measure: str, optional (default='aad') Dissimilarity measure to be used in calculating the smoothing factor for points, options available: - 'aad': Average Absolute Deviation - 'var': Variance - 'iqr': Interquartile Range random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. Attributes ---------- decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data. The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The threshold is based on ``contamination``. It is the ``n_samples * contamination`` most abnormal samples in ``decision_scores_``. The threshold is calculated for generating binary outlier labels. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. """ def __init__(self, contamination=0.1, n_iter=50, dis_measure='aad', random_state=None): super(LMDD, self).__init__(contamination=contamination) self.n_iter, self.n_iter_ = n_iter, n_iter self.dis_measure, self.dis_measure_ = dis_measure, dis_measure # add this assignment to prevent clone error; not being used. self.random_state = random_state self.random_state_, self.dis_measure_ = _check_params(n_iter, dis_measure, random_state)
[docs] def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : Ignored Not used, present for API consistency by convention. Returns ------- self : object Fitted estimator. """ X = check_array(X) self._set_n_classes(y) self.decision_scores_ = self.decision_function(X) self._process_decision_scores() return self
[docs] def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector. The anomaly score of an input sample is computed based on different detector algorithms. For consistency, outliers are assigned with larger anomaly scores. Parameters ---------- X : numpy array of shape (n_samples, n_features) The training input samples. Sparse matrices are accepted only if they are supported by the base estimator. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ return self.__sf(X)
def __dis(self, X): """ Internal function to calculate for dissimilarity in a sequence of sets. """ res_ = np.zeros(shape=(X.shape[0],)) var_max, j = -np.inf, 0 # this can be vectorized but just for comforting memory for i in range(1, X.shape[0]): _var = self.dis_measure_(X[:i + 1]) - self.dis_measure_(X[:i]) if _var > var_max: var_max = _var j = i if var_max > res_[j]: res_[j] = var_max for k in range(j + 1, X.shape[0]): dk_diff = self.dis_measure_(np.vstack((X[:j], X[k])))\ - self.dis_measure_(np.vstack((X[:j + 1], X[k]))) if dk_diff >= 0: res_[k] = dk_diff + var_max return res_ def __sf(self, X): """Internal function to calculate for Smoothing Factors of data points Repeated n_iter_ of times in randomized mode. """ dis_ = np.zeros(shape=(X.shape[0],)) card_ = np.zeros(shape=(X.shape[0],)) # perform one process with the original input order itr_res = self.__dis(X) np.put(card_, X.shape[0] - sum([i > 0. for i in itr_res]), np.where(itr_res > 0.)) # create a copy of random state to preserve original state for # future fits (if any) random_state = np.random.RandomState( seed=self.random_state_.get_state()[1][0]) indices = np.arange(X.shape[0]) for _ in range(self.n_iter_): ind_ = indices random_state.shuffle(ind_) _x = X[indices] # get dissimilarity of this iteration and restore original order itr_res = self.__dis(_x)[np.argsort(ind_)] current_card = X.shape[0] - sum([i > 0. for i in itr_res]) # compare with previous iteration to get the maximal dissimilarity for i, j in enumerate(itr_res): if j > dis_[i]: dis_[i] = j card_[i] = current_card # Increase random state seed by one to reorder input next iteration random_state.seed(random_state.get_state()[1][0] + 1) return np.multiply(dis_, card_)