Source code for pyod.models.abod

# -*- coding: utf-8 -*-
"""Angle-based Outlier Detector (ABOD)
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause

from __future__ import division
from __future__ import print_function

import warnings
from itertools import combinations

import numpy as np
from numba import njit
from sklearn.neighbors import KDTree
from sklearn.neighbors import NearestNeighbors
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ..utils.utility import check_parameter


@njit
def _wcos(curr_pt, a, b):  # pragma: no cover
    """Internal function to calculate weighted cosine using optimized
    numba code.

    Parameters
    ----------
    curr_pt : numpy array of shape (n_samples, n_features)
        Current sample to be calculated.

    a : numpy array of shape (n_samples, n_features)
        Training sample a.

    b : numpy array of shape (n_samples, n_features)
        Training sample b.

    Returns
    -------
    wcos : float in range [-1, 1]
        Cosine similarity between a-curr_pt and b-curr_pt.

    """

    a_curr = a - curr_pt
    b_curr = b - curr_pt

    # wcos = (<a_curr, b_curr>/((|a_curr|*|b_curr|)^2)
    wcos = np.dot(a_curr, b_curr) / (
            np.linalg.norm(a_curr, 2) ** 2) / (
                   np.linalg.norm(b_curr, 2) ** 2)
    return wcos


def _calculate_wocs(curr_pt, X, X_ind):
    """Calculated the variance of weighted cosine of a point.
    wcos = (<a_curr, b_curr>/((|a_curr|*|b_curr|)^2)

    Parameters
    ----------
    curr_pt : numpy array, shape (1, n_features)
        The sample to be calculated.

    X : numpy array of shape (n_samples, n_features)
        The training dataset.

    X_ind : list
        The valid index of the training data.

    Returns
    -------
    cos_angle_var : float
        The variance of cosine angle

    """
    wcos_list = []
    curr_pair_inds = list(combinations(X_ind, 2))
    for j, (a_ind, b_ind) in enumerate(curr_pair_inds):
        a = X[a_ind, :]
        b = X[b_ind, :]

        # skip if no angle can be formed
        if np.array_equal(a, curr_pt) or np.array_equal(b, curr_pt):
            continue
        # add the weighted cosine to the list
        wcos_list.append(_wcos(curr_pt, a, b))
    return np.var(wcos_list)


# noinspection PyPep8Naming
[docs]class ABOD(BaseDetector): """ABOD class for Angle-base Outlier Detection. For an observation, the variance of its weighted cosine scores to all neighbors could be viewed as the outlying score. See :cite:`kriegel2008angle` for details. Two version of ABOD are supported: - Fast ABOD: use k nearest neighbors to approximate. - Original ABOD: consider all training points with high time complexity at O(n^3). Parameters ---------- contamination : float in (0., 0.5), optional (default=0.1) The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. n_neighbors : int, optional (default=10) Number of neighbors to use by default for k neighbors queries. method: str, optional (default='fast') Valid values for metric are: - 'fast': fast ABOD. Only consider n_neighbors of training points - 'default': original ABOD with all training points, which could be slow Attributes ---------- decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data. The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The threshold is based on ``contamination``. It is the ``n_samples * contamination`` most abnormal samples in ``decision_scores_``. The threshold is calculated for generating binary outlier labels. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. """ def __init__(self, contamination=0.1, n_neighbors=5, method='fast'): super(ABOD, self).__init__(contamination=contamination) self.method = method self.n_neighbors = n_neighbors
[docs] def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : Ignored Not used, present for API consistency by convention. Returns ------- self : object Fitted estimator. """ # validate inputs X and y (optional) X = check_array(X) self._set_n_classes(y) self.X_train_ = X self.n_train_ = X.shape[0] self.decision_scores_ = np.zeros([self.n_train_, 1]) if self.method == 'fast': self._fit_fast() elif self.method == 'default': self._fit_default() else: raise ValueError(self.method, "is not a valid method") # flip the scores self.decision_scores_ = self.decision_scores_.ravel() * -1 self._process_decision_scores() return self
def _fit_default(self): """Default ABOD method. Use all training points with high complexity O(n^3). For internal use only. """ for i in range(self.n_train_): curr_pt = self.X_train_[i, :] # get the index pairs of the neighbors, remove itself from index X_ind = list(range(0, self.n_train_)) X_ind.remove(i) self.decision_scores_[i, 0] = _calculate_wocs(curr_pt, self.X_train_, X_ind) return self def _fit_fast(self): """Fast ABOD method. Only use n_neighbors for angle calculation. Internal use only """ # make sure the n_neighbors is in the range if self.n_neighbors >= self.n_train_: self.n_neighbors = self.n_train_ - 1 warnings.warn("n_neighbors is set to the number of " "training points minus 1: {0}".format(self.n_train_)) check_parameter(self.n_neighbors, 1, self.n_train_, include_left=True, include_right=True) self.tree_ = KDTree(self.X_train_) neigh = NearestNeighbors(n_neighbors=self.n_neighbors) neigh.fit(self.X_train_) ind_arr = neigh.kneighbors(n_neighbors=self.n_neighbors, return_distance=False) for i in range(self.n_train_): curr_pt = self.X_train_[i, :] X_ind = ind_arr[i, :] self.decision_scores_[i, 0] = _calculate_wocs(curr_pt, self.X_train_, X_ind) return self # noinspection PyPep8Naming
[docs] def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector. The anomaly score of an input sample is computed based on different detector algorithms. For consistency, outliers are assigned with larger anomaly scores. Parameters ---------- X : numpy array of shape (n_samples, n_features) The training input samples. Sparse matrices are accepted only if they are supported by the base estimator. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ check_is_fitted(self, ['X_train_', 'n_train_', 'decision_scores_', 'threshold_', 'labels_']) X = check_array(X) if self.method == 'fast': # fast ABOD # outliers have higher outlier scores return self._decision_function_fast(X) * -1 else: # default ABOD return self._decision_function_default(X) * -1
def _decision_function_default(self, X): """Internal method for predicting outlier scores using default ABOD. Parameters ---------- X : numpy array of shape (n_samples, n_features) The training input samples. Returns ------- pred_score : array, shape (n_samples,) The anomaly score of the input samples. """ # initialize the output score pred_score = np.zeros([X.shape[0], 1]) for i in range(X.shape[0]): curr_pt = X[i, :] # get the index pairs of the neighbors X_ind = list(range(0, self.n_train_)) pred_score[i, :] = _calculate_wocs(curr_pt, self.X_train_, X_ind) return pred_score.ravel() def _decision_function_fast(self, X): """Internal method for predicting outlier scores using Fast ABOD. Parameters ---------- X : numpy array of shape (n_samples, n_features) The training input samples. Returns ------- pred_score : array, shape (n_samples,) The anomaly score of the input samples. """ check_is_fitted(self, ['tree_']) # initialize the output score pred_score = np.zeros([X.shape[0], 1]) # get the indexes of the X's k nearest training points _, ind_arr = self.tree_.query(X, k=self.n_neighbors) for i in range(X.shape[0]): curr_pt = X[i, :] X_ind = ind_arr[i, :] pred_score[i, :] = _calculate_wocs(curr_pt, self.X_train_, X_ind) return pred_score.ravel()