Source code for pyod.models.loda

# -*- coding: utf-8 -*-
"""Loda: Lightweight on-line detector of anomalies
Adapted from tilitools (https://github.com/nicococo/tilitools) by
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause

from __future__ import division
from __future__ import print_function

import numbers

import numpy as np
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ..utils.utility import get_optimal_n_bins


[docs] class LODA(BaseDetector): """Loda: Lightweight on-line detector of anomalies. See :cite:`pevny2016loda` for more information. Two versions of LODA are supported: - Static number of bins: uses a static number of bins for all random cuts. - Automatic number of bins: every random cut uses a number of bins deemed to be optimal according to the Birge-Rozenblac method (:cite:`birge2006many`). Parameters ---------- contamination : float in (0., 0.5), optional (default=0.1) The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. n_bins : int or string, optional (default = 10) The number of bins for the histogram. If set to "auto", the Birge-Rozenblac method will be used to automatically determine the optimal number of bins. n_random_cuts : int, optional (default = 100) The number of random cuts. Attributes ---------- decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data. The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The threshold is based on ``contamination``. It is the ``n_samples * contamination`` most abnormal samples in ``decision_scores_``. The threshold is calculated for generating binary outlier labels. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. """ def __init__(self, contamination=0.1, n_bins=10, n_random_cuts=100): super(LODA, self).__init__(contamination=contamination) self.n_bins = n_bins self.n_random_cuts = n_random_cuts self.weights = np.ones(n_random_cuts, dtype=float) / n_random_cuts
[docs] def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : Ignored Not used, present for API consistency by convention. Returns ------- self : object Fitted estimator. """ # validate inputs X and y (optional) X = check_array(X) self._set_n_classes(y) pred_scores = np.zeros([X.shape[0], 1]) n_components = X.shape[1] n_nonzero_components = np.sqrt(n_components) n_zero_components = n_components - int(n_nonzero_components) self.projections_ = np.random.randn(self.n_random_cuts, n_components) # If set to auto: determine optimal n_bins using Birge Rozenblac method if isinstance(self.n_bins, str) and self.n_bins.lower() == "auto": self.histograms_ = [] self.limits_ = [] self.n_bins_ = [] # only used when n_bins is determined by method "auto" for i in range(self.n_random_cuts): rands = np.random.permutation(n_components)[:n_zero_components] self.projections_[i, rands] = 0. projected_data = self.projections_[i, :].dot(X.T) n_bins = get_optimal_n_bins(projected_data) self.n_bins_.append(n_bins) histogram, limits = np.histogram( projected_data, bins=n_bins, density=False) histogram = histogram.astype(np.float64) histogram += 1e-12 histogram /= np.sum(histogram) self.histograms_.append(histogram) self.limits_.append(limits) # calculate the scores for the training samples inds = np.searchsorted(limits[:n_bins - 1], projected_data, side='left') pred_scores[:, 0] += -self.weights[i] * np.log( histogram[inds]) elif isinstance(self.n_bins, numbers.Integral): self.histograms_ = np.zeros((self.n_random_cuts, self.n_bins)) self.limits_ = np.zeros((self.n_random_cuts, self.n_bins + 1)) for i in range(self.n_random_cuts): rands = np.random.permutation(n_components)[:n_zero_components] self.projections_[i, rands] = 0. projected_data = self.projections_[i, :].dot(X.T) self.histograms_[i, :], self.limits_[i, :] = np.histogram( projected_data, bins=self.n_bins, density=False) self.histograms_[i, :] += 1e-12 self.histograms_[i, :] /= np.sum(self.histograms_[i, :]) # calculate the scores for the training samples inds = np.searchsorted(self.limits_[i, :self.n_bins - 1], projected_data, side='left') pred_scores[:, 0] += -self.weights[i] * np.log( self.histograms_[i, inds]) else: raise ValueError("n_bins must be an int or \'auto\', " "got: %f" % self.n_bins) self.decision_scores_ = (pred_scores / self.n_random_cuts).ravel() self._process_decision_scores() return self
[docs] def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector. The anomaly score of an input sample is computed based on different detector algorithms. For consistency, outliers are assigned with larger anomaly scores. Parameters ---------- X : numpy array of shape (n_samples, n_features) The training input samples. Sparse matrices are accepted only if they are supported by the base estimator. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ check_is_fitted(self, ['projections_', 'decision_scores_', 'threshold_', 'labels_']) X = check_array(X) pred_scores = np.zeros([X.shape[0], 1]) if isinstance(self.n_bins, str) and self.n_bins.lower() == "auto": for i in range(self.n_random_cuts): projected_data = self.projections_[i, :].dot(X.T) inds = np.searchsorted(self.limits_[i][:self.n_bins_[i] - 1], projected_data, side='left') pred_scores[:, 0] += -self.weights[i] * np.log( self.histograms_[i][inds]) elif isinstance(self.n_bins, numbers.Integral): for i in range(self.n_random_cuts): projected_data = self.projections_[i, :].dot(X.T) inds = np.searchsorted(self.limits_[i, :self.n_bins - 1], projected_data, side='left') pred_scores[:, 0] += -self.weights[i] * np.log( self.histograms_[i, inds]) else: raise ValueError("n_bins must be an int or \'auto\', " "got: %f" % self.n_bins) pred_scores /= self.n_random_cuts return pred_scores.ravel()