Source code for pyod.models.hdbscan

# -*- coding: utf-8 -*-
"""HDBSCAN-based outlier detection.
"""
# Author: Chao Gao <gaoc96@qq.com>
# License: BSD 2 clause

import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.utils.validation import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector


[docs] class HDBSCAN(BaseDetector): """Wrapper of scikit-learn HDBSCAN for outlier detection. HDBSCAN (Hierarchical Density-Based Spatial Clustering of Applications with Noise) performs density-based clustering and identifies outliers as points that do not belong to any cluster (noise) or have weak membership in their assigned cluster. The anomaly score is computed as ``1 - probabilities_``, where ``probabilities_`` represents each point's cluster membership strength. See :cite:`campello2013density` for details. For new data prediction, the outlier scores are approximated using k-nearest neighbor interpolation from the training data scores. Parameters ---------- min_cluster_size : int, optional (default=5) The minimum number of samples in a group for that group to be considered a cluster. min_samples : int or None, optional (default=None) The number of samples in a neighborhood for a point to be considered a core point. If None, defaults to min_cluster_size. metric : str or callable, optional (default='euclidean') The metric to use when calculating distance between instances in a feature array. alpha : float, optional (default=1.0) A distance scaling parameter as used in robust single linkage. algorithm : str, optional (default='auto') Exactly which algorithm to use for computing core distances; By default this is set to ``'auto'`` which attempts to use a ``KDTree`` if possible, otherwise it uses a ``BallTree``. Both ``'KDTree'`` and ``'BallTree'`` algorithms are also available. leaf_size : int, optional (default=40) Leaf size for trees responsible for fast nearest neighbor queries during clustering. n_jobs : int or None, optional (default=1) Number of parallel jobs to run for nearest-neighbor search in :meth:`decision_function` (KNN interpolation on training scores). ``-1`` means using all processors. contamination : float in (0., 0.5), optional (default=0.1) The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. Attributes ---------- decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data (1 - membership probability). The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The threshold is based on ``contamination``. It is the ``n_samples * contamination`` most abnormal samples in ``decision_scores_``. The threshold is calculated for generating binary outlier labels. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. cluster_labels_ : numpy array of shape (n_samples,) Cluster labels for each point in the training data. Noisy samples are given the label -1. """ def __init__(self, min_cluster_size=5, min_samples=None, metric='euclidean', alpha=1.0, algorithm='auto', leaf_size=40, n_jobs=1, contamination=0.1): super(HDBSCAN, self).__init__(contamination=contamination) self.min_cluster_size = min_cluster_size self.min_samples = min_samples self.metric = metric self.alpha = alpha self.algorithm = algorithm self.leaf_size = leaf_size self.n_jobs = n_jobs
[docs] def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : Ignored Not used, present for API consistency by convention. Returns ------- self : object Fitted estimator. """ X = check_array(X) self._set_n_classes(y) try: from sklearn.cluster import HDBSCAN as sklearn_HDBSCAN except Exception as e: raise ImportError( "HDBSCAN requires scikit-learn with sklearn.cluster.HDBSCAN. " "Please upgrade scikit-learn." ) from e self.detector_ = sklearn_HDBSCAN( min_cluster_size=self.min_cluster_size, min_samples=self.min_samples, metric=self.metric, alpha=self.alpha, algorithm=self.algorithm, leaf_size=self.leaf_size, store_centers='centroid', ) self.detector_.fit(X) self.cluster_labels_ = self.detector_.labels_ # Use 1 - membership probability as outlier scores # Noise points (label=-1) have probability 0, so score 1.0 self.decision_scores_ = 1.0 - self.detector_.probabilities_ self._process_decision_scores() # Build a KNN model on training data for scoring new samples self.X_train_ = X self.tree_ = NearestNeighbors( n_neighbors=min(self.min_cluster_size, X.shape[0]), metric=self.metric, n_jobs=self.n_jobs, ) self.tree_.fit(X) return self
[docs] def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector. For new data, anomaly scores are approximated by the weighted average of the k nearest neighbors' outlier scores in the training data. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_']) X = check_array(X) # Find k nearest neighbors in training data dist, ind = self.tree_.kneighbors(X) # Weight by inverse distance; closer neighbors have more influence # Add small epsilon to avoid division by zero weights = 1.0 / (dist + 1e-10) weights = weights / weights.sum(axis=1, keepdims=True) # Weighted average of training outlier scores neighbor_scores = self.decision_scores_[ind] scores = np.sum(weights * neighbor_scores, axis=1) return scores.ravel()