# -*- coding: utf-8 -*-
"""HDBSCAN-based outlier detection.
"""
# Author: Chao Gao <gaoc96@qq.com>
# License: BSD 2 clause
import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.utils.validation import check_array
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
[docs]
class HDBSCAN(BaseDetector):
"""Wrapper of scikit-learn HDBSCAN for outlier detection.
HDBSCAN (Hierarchical Density-Based Spatial Clustering of Applications
with Noise) performs density-based clustering and identifies outliers as
points that do not belong to any cluster (noise) or have weak membership
in their assigned cluster. The anomaly score is computed as
``1 - probabilities_``, where ``probabilities_`` represents each point's
cluster membership strength. See :cite:`campello2013density` for details.
For new data prediction, the outlier scores are approximated using
k-nearest neighbor interpolation from the training data scores.
Parameters
----------
min_cluster_size : int, optional (default=5)
The minimum number of samples in a group for that group to be
considered a cluster.
min_samples : int or None, optional (default=None)
The number of samples in a neighborhood for a point to be
considered a core point. If None, defaults to min_cluster_size.
metric : str or callable, optional (default='euclidean')
The metric to use when calculating distance between instances in a
feature array.
alpha : float, optional (default=1.0)
A distance scaling parameter as used in robust single linkage.
algorithm : str, optional (default='auto')
Exactly which algorithm to use for computing core distances;
By default this is set to ``'auto'`` which attempts to use a
``KDTree`` if possible, otherwise it uses a ``BallTree``.
Both ``'KDTree'`` and ``'BallTree'`` algorithms are also available.
leaf_size : int, optional (default=40)
Leaf size for trees responsible for fast nearest neighbor queries
during clustering.
n_jobs : int or None, optional (default=1)
Number of parallel jobs to run for nearest-neighbor search in
:meth:`decision_function` (KNN interpolation on training scores).
``-1`` means using all processors.
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set, i.e. the proportion
of outliers in the data set. Used when fitting to define the
threshold on the decision function.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data (1 - membership probability).
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
cluster_labels_ : numpy array of shape (n_samples,)
Cluster labels for each point in the training data. Noisy samples
are given the label -1.
"""
def __init__(self, min_cluster_size=5, min_samples=None,
metric='euclidean', alpha=1.0, algorithm='auto',
leaf_size=40, n_jobs=1, contamination=0.1):
super(HDBSCAN, self).__init__(contamination=contamination)
self.min_cluster_size = min_cluster_size
self.min_samples = min_samples
self.metric = metric
self.alpha = alpha
self.algorithm = algorithm
self.leaf_size = leaf_size
self.n_jobs = n_jobs
[docs]
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
X = check_array(X)
self._set_n_classes(y)
try:
from sklearn.cluster import HDBSCAN as sklearn_HDBSCAN
except Exception as e:
raise ImportError(
"HDBSCAN requires scikit-learn with sklearn.cluster.HDBSCAN. "
"Please upgrade scikit-learn."
) from e
self.detector_ = sklearn_HDBSCAN(
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples,
metric=self.metric,
alpha=self.alpha,
algorithm=self.algorithm,
leaf_size=self.leaf_size,
store_centers='centroid',
)
self.detector_.fit(X)
self.cluster_labels_ = self.detector_.labels_
# Use 1 - membership probability as outlier scores
# Noise points (label=-1) have probability 0, so score 1.0
self.decision_scores_ = 1.0 - self.detector_.probabilities_
self._process_decision_scores()
# Build a KNN model on training data for scoring new samples
self.X_train_ = X
self.tree_ = NearestNeighbors(
n_neighbors=min(self.min_cluster_size, X.shape[0]),
metric=self.metric,
n_jobs=self.n_jobs,
)
self.tree_.fit(X)
return self
[docs]
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.
For new data, anomaly scores are approximated by the weighted
average of the k nearest neighbors' outlier scores in the training
data.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
# Find k nearest neighbors in training data
dist, ind = self.tree_.kneighbors(X)
# Weight by inverse distance; closer neighbors have more influence
# Add small epsilon to avoid division by zero
weights = 1.0 / (dist + 1e-10)
weights = weights / weights.sum(axis=1, keepdims=True)
# Weighted average of training outlier scores
neighbor_scores = self.decision_scores_[ind]
scores = np.sum(weights * neighbor_scores, axis=1)
return scores.ravel()