# Source code for pyod.models.abod

```# -*- coding: utf-8 -*-
"""Angle-based Outlier Detector (ABOD)
"""
# Author: Yue Zhao <zhaoy@cmu.edu>

from __future__ import division
from __future__ import print_function

import warnings
from itertools import combinations

import numpy as np
from numba import njit
from sklearn.neighbors import KDTree
from sklearn.neighbors import NearestNeighbors
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ..utils.utility import check_parameter

@njit
def _wcos(curr_pt, a, b):  # pragma: no cover
"""Internal function to calculate weighted cosine using optimized
numba code.

Parameters
----------
curr_pt : numpy array of shape (n_samples, n_features)
Current sample to be calculated.

a : numpy array of shape (n_samples, n_features)
Training sample a.

b : numpy array of shape (n_samples, n_features)
Training sample b.

Returns
-------
wcos : float in range [-1, 1]
Cosine similarity between a-curr_pt and b-curr_pt.

"""

a_curr = a - curr_pt
b_curr = b - curr_pt

# wcos = (<a_curr, b_curr>/((|a_curr|*|b_curr|)^2)
wcos = np.dot(a_curr, b_curr) / (
np.linalg.norm(a_curr, 2) ** 2) / (
np.linalg.norm(b_curr, 2) ** 2)
return wcos

def _calculate_wocs(curr_pt, X, X_ind):
"""Calculated the variance of weighted cosine of a point.
wcos = (<a_curr, b_curr>/((|a_curr|*|b_curr|)^2)

Parameters
----------
curr_pt : numpy array, shape (1, n_features)
The sample to be calculated.

X : numpy array of shape (n_samples, n_features)
The training dataset.

X_ind : list
The valid index of the training data.

Returns
-------
cos_angle_var : float
The variance of cosine angle

"""
wcos_list = []
curr_pair_inds = list(combinations(X_ind, 2))
for j, (a_ind, b_ind) in enumerate(curr_pair_inds):
a = X[a_ind, :]
b = X[b_ind, :]

# skip if no angle can be formed
if np.array_equal(a, curr_pt) or np.array_equal(b, curr_pt):
continue
# add the weighted cosine to the list
wcos_list.append(_wcos(curr_pt, a, b))
return np.var(wcos_list)

# noinspection PyPep8Naming

[docs]
class ABOD(BaseDetector):
"""ABOD class for Angle-base Outlier Detection.
For an observation, the variance of its weighted cosine scores to all
neighbors could be viewed as the outlying score.
See :cite:`kriegel2008angle` for details.

Two version of ABOD are supported:

- Fast ABOD: use k nearest neighbors to approximate.
- Original ABOD: consider all training points with high time complexity at
O(n^3).

Parameters
----------
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set, i.e.
the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function.

n_neighbors : int, optional (default=10)
Number of neighbors to use by default for k neighbors queries.

method: str, optional (default='fast')
Valid values for metric are:

- 'fast': fast ABOD. Only consider n_neighbors of training points
- 'default': original ABOD with all training points, which could be
slow

Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.

threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.

labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""

def __init__(self, contamination=0.1, n_neighbors=5, method='fast'):
super(ABOD, self).__init__(contamination=contamination)
self.method = method
self.n_neighbors = n_neighbors

[docs]
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.

Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.

y : Ignored
Not used, present for API consistency by convention.

Returns
-------
self : object
Fitted estimator.
"""
# validate inputs X and y (optional)
X = check_array(X)
self._set_n_classes(y)

self.X_train_ = X
self.n_train_ = X.shape
self.decision_scores_ = np.zeros([self.n_train_, 1])

if self.method == 'fast':
self._fit_fast()
elif self.method == 'default':
self._fit_default()
else:
raise ValueError(self.method, "is not a valid method")

# flip the scores
self.decision_scores_ = self.decision_scores_.ravel() * -1
self._process_decision_scores()
return self

def _fit_default(self):
"""Default ABOD method. Use all training points with high complexity
O(n^3). For internal use only.
"""
for i in range(self.n_train_):
curr_pt = self.X_train_[i, :]

# get the index pairs of the neighbors, remove itself from index
X_ind = list(range(0, self.n_train_))
X_ind.remove(i)

self.decision_scores_[i, 0] = _calculate_wocs(curr_pt,
self.X_train_,
X_ind)
return self

def _fit_fast(self):
"""Fast ABOD method. Only use n_neighbors for angle calculation.
Internal use only
"""

# make sure the n_neighbors is in the range
if self.n_neighbors >= self.n_train_:
self.n_neighbors = self.n_train_ - 1
warnings.warn("n_neighbors is set to the number of "
"training points minus 1: {0}".format(self.n_train_))

check_parameter(self.n_neighbors, 1, self.n_train_,
include_left=True, include_right=True)

self.tree_ = KDTree(self.X_train_)

neigh = NearestNeighbors(n_neighbors=self.n_neighbors)
neigh.fit(self.X_train_)
ind_arr = neigh.kneighbors(n_neighbors=self.n_neighbors,
return_distance=False)

for i in range(self.n_train_):
curr_pt = self.X_train_[i, :]
X_ind = ind_arr[i, :]
self.decision_scores_[i, 0] = _calculate_wocs(curr_pt,
self.X_train_,
X_ind)
return self

# noinspection PyPep8Naming

[docs]
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.

The anomaly score of an input sample is computed based on different
detector algorithms. For consistency, outliers are assigned with
larger anomaly scores.

Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples. Sparse matrices are accepted only
if they are supported by the base estimator.

Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""

check_is_fitted(self, ['X_train_', 'n_train_', 'decision_scores_',
'threshold_', 'labels_'])
X = check_array(X)

if self.method == 'fast':  # fast ABOD
# outliers have higher outlier scores
return self._decision_function_fast(X) * -1
else:  # default ABOD
return self._decision_function_default(X) * -1

def _decision_function_default(self, X):
"""Internal method for predicting outlier scores using default ABOD.

Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples.

Returns
-------
pred_score : array, shape (n_samples,)
The anomaly score of the input samples.

"""
# initialize the output score
pred_score = np.zeros([X.shape, 1])

for i in range(X.shape):
curr_pt = X[i, :]
# get the index pairs of the neighbors
X_ind = list(range(0, self.n_train_))
pred_score[i, :] = _calculate_wocs(curr_pt, self.X_train_, X_ind)

return pred_score.ravel()

def _decision_function_fast(self, X):
"""Internal method for predicting outlier scores using Fast ABOD.

Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples.

Returns
-------
pred_score : array, shape (n_samples,)
The anomaly score of the input samples.

"""

check_is_fitted(self, ['tree_'])
# initialize the output score
pred_score = np.zeros([X.shape, 1])

# get the indexes of the X's k nearest training points
_, ind_arr = self.tree_.query(X, k=self.n_neighbors)

for i in range(X.shape):
curr_pt = X[i, :]
X_ind = ind_arr[i, :]
pred_score[i, :] = _calculate_wocs(curr_pt, self.X_train_, X_ind)

return pred_score.ravel()

```