# -*- coding: utf-8 -*-
"""XGBOD: Improving Supervised Outlier Detection with Unsupervised
Representation Learning. A semi-supervised outlier detection framework.
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause
from __future__ import division
from __future__ import print_function
import numpy as np
from sklearn.metrics import roc_auc_score
from sklearn.utils import check_array
from sklearn.utils.validation import check_X_y
from sklearn.utils.validation import check_is_fitted
try:
import xgboost
except ImportError:
print('please install xgboost first for running XGBOD')
from xgboost.sklearn import XGBClassifier
from .base import BaseDetector
from .knn import KNN
from .lof import LOF
from .iforest import IForest
from .hbos import HBOS
from .ocsvm import OCSVM
from ..utils.utility import check_parameter
from ..utils.utility import check_detector
from ..utils.utility import standardizer
from ..utils.utility import precision_n_scores
[docs]
class XGBOD(BaseDetector):
r"""XGBOD class for outlier detection.
It first uses the passed in unsupervised outlier detectors to extract
richer representation of the data and then concatenates the newly
generated features to the original feature for constructing the augmented
feature space. An XGBoost classifier is then applied on this augmented
feature space. Read more in the :cite:`zhao2018xgbod`.
Parameters
----------
estimator_list : list, optional (default=None)
The list of pyod detectors passed in for unsupervised learning
standardization_flag_list : list, optional (default=None)
The list of boolean flags for indicating whether to perform
standardization for each detector.
max_depth : int
Maximum tree depth for base learners.
learning_rate : float
Boosting learning rate (xgb's "eta")
n_estimators : int
Number of boosted trees to fit.
silent : bool
Whether to print messages while running boosting.
objective : string or callable
Specify the learning task and the corresponding learning objective or
a custom objective function to be used (see note below).
booster : string
Specify which booster to use: gbtree, gblinear or dart.
n_jobs : int
Number of parallel threads used to run xgboost. (replaces ``nthread``)
gamma : float
Minimum loss reduction required to make a further partition on a leaf
node of the tree.
min_child_weight : int
Minimum sum of instance weight(hessian) needed in a child.
max_delta_step : int
Maximum delta step we allow each tree's weight estimation to be.
subsample : float
Subsample ratio of the training instance.
colsample_bytree : float
Subsample ratio of columns when constructing each tree.
colsample_bylevel : float
Subsample ratio of columns for each split, in each level.
reg_alpha : float (xgb's alpha)
L1 regularization term on weights.
reg_lambda : float (xgb's lambda)
L2 regularization term on weights.
scale_pos_weight : float
Balancing of positive and negative weights.
base_score:
The initial prediction score of all instances, global bias.
random_state : int
Random number seed. (replaces seed)
# missing : float, optional
# Value in the data which needs to be present as a missing value. If
# None, defaults to np.nan.
importance_type: string, default "gain"
The feature importance type for the ``feature_importances_``
property: either "gain",
"weight", "cover", "total_gain" or "total_cover".
\*\*kwargs : dict, optional
Keyword arguments for XGBoost Booster object. Full documentation of
parameters can be found here:
https://github.com/dmlc/xgboost/blob/master/doc/parameter.rst.
Attempting to set a parameter via the constructor args and \*\*kwargs
dict simultaneously will result in a TypeError.
Note: \*\*kwargs is unsupported by scikit-learn. We do not
guarantee that parameters passed via this argument will interact
properly with scikit-learn.
Attributes
----------
n_detector_ : int
The number of unsupervised of detectors used.
clf_ : object
The XGBoost classifier.
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is fitted.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, estimator_list=None, standardization_flag_list=None,
max_depth=3, learning_rate=0.1,
n_estimators=100, silent=True,
objective="binary:logistic", booster='gbtree',
n_jobs=1, nthread=None, gamma=0, min_child_weight=1,
max_delta_step=0, subsample=1, colsample_bytree=1,
colsample_bylevel=1,
reg_alpha=0, reg_lambda=1, scale_pos_weight=1,
base_score=0.5, random_state=0,
# missing=None,
**kwargs):
super(XGBOD, self).__init__()
self.estimator_list = estimator_list
self.standardization_flag_list = standardization_flag_list
self.max_depth = max_depth
self.learning_rate = learning_rate
self.n_estimators = n_estimators
self.silent = silent
self.objective = objective
self.booster = booster
self.n_jobs = n_jobs
self.nthread = nthread
self.gamma = gamma
self.min_child_weight = min_child_weight
self.max_delta_step = max_delta_step
self.subsample = subsample
self.colsample_bytree = colsample_bytree
self.colsample_bylevel = colsample_bylevel
self.reg_alpha = reg_alpha
self.reg_lambda = reg_lambda
self.scale_pos_weight = scale_pos_weight
self.base_score = base_score
self.random_state = random_state
# self.missing = missing
self.kwargs = kwargs
def _init_detectors(self, X):
"""initialize unsupervised detectors if no predefined detectors is
provided.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The train data
Returns
-------
estimator_list : list of object
The initialized list of detectors
standardization_flag_list : list of boolean
The list of bool flag to indicate whether standardization is needed
"""
estimator_list = []
standardization_flag_list = []
# predefined range of n_neighbors for KNN, AvgKNN, and LOF
k_range = [1, 3, 5, 10, 20, 30, 40, 50]
# validate the value of k
k_range = [k for k in k_range if k < X.shape[0]]
for k in k_range:
estimator_list.append(KNN(n_neighbors=k, method='largest'))
# estimator_list.append(KNN(n_neighbors=k, method='mean'))
estimator_list.append(LOF(n_neighbors=k))
# standardization_flag_list.append(True)
standardization_flag_list.append(True)
standardization_flag_list.append(True)
n_bins_range = [5, 10, 15, 20, 25, 30, 50]
for n_bins in n_bins_range:
estimator_list.append(HBOS(n_bins=n_bins))
standardization_flag_list.append(False)
# predefined range of nu for one-class svm
nu_range = [0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.99]
for nu in nu_range:
estimator_list.append(OCSVM(nu=nu))
standardization_flag_list.append(True)
# predefined range for number of estimators in isolation forests
n_range = [10, 20, 50, 70, 100, 150, 200]
for n in n_range:
estimator_list.append(
IForest(n_estimators=n, random_state=self.random_state))
standardization_flag_list.append(False)
# # predefined range for number of estimators in LODA
# n_bins_range = [3, 5, 10, 15, 20, 25, 30, 50]
# for n_bins in n_bins_range:
# estimator_list.append(LODA(n_bins=n_bins))
# standardization_flag_list.append(False)
return estimator_list, standardization_flag_list
def _validate_estimator(self, X):
if self.estimator_list is None:
self.estimator_list, \
self.standardization_flag_list = self._init_detectors(X)
# perform standardization for all detectors by default
if self.standardization_flag_list is None:
self.standardization_flag_list = [True] * len(self.estimator_list)
# validate two lists length
if len(self.estimator_list) != len(self.standardization_flag_list):
raise ValueError(
"estimator_list length ({0}) is not equal "
"to standardization_flag_list length ({1})".format(
len(self.estimator_list),
len(self.standardization_flag_list)))
# validate the estimator list is not empty
check_parameter(len(self.estimator_list), low=1,
param_name='number of estimators',
include_left=True, include_right=True)
for estimator in self.estimator_list:
check_detector(estimator)
return len(self.estimator_list)
def _generate_new_features(self, X):
X_add = np.zeros([X.shape[0], self.n_detector_])
# keep the standardization scalar for test conversion
X_norm = self._scalar.transform(X)
for ind, estimator in enumerate(self.estimator_list):
if self.standardization_flag_list[ind]:
X_add[:, ind] = estimator.decision_function(X_norm)
else:
X_add[:, ind] = estimator.decision_function(X)
return X_add
[docs]
def fit(self, X, y):
"""Fit the model using X and y as training data.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
Training data.
y : numpy array of shape (n_samples,)
The ground truth (binary label)
- 0 : inliers
- 1 : outliers
Returns
-------
self : object
"""
# Validate inputs X and y
X, y = check_X_y(X, y)
X = check_array(X)
self._set_n_classes(y)
self.n_detector_ = self._validate_estimator(X)
self.X_train_add_ = np.zeros([X.shape[0], self.n_detector_])
# keep the standardization scalar for test conversion
X_norm, self._scalar = standardizer(X, keep_scalar=True)
for ind, estimator in enumerate(self.estimator_list):
if self.standardization_flag_list[ind]:
estimator.fit(X_norm)
self.X_train_add_[:, ind] = estimator.decision_scores_
else:
estimator.fit(X)
self.X_train_add_[:, ind] = estimator.decision_scores_
# construct the new feature space
self.X_train_new_ = np.concatenate((X, self.X_train_add_), axis=1)
# initialize, train, and predict on XGBoost
self.clf_ = clf = XGBClassifier(max_depth=self.max_depth,
learning_rate=self.learning_rate,
n_estimators=self.n_estimators,
silent=self.silent,
objective=self.objective,
booster=self.booster,
n_jobs=self.n_jobs,
nthread=self.nthread,
gamma=self.gamma,
min_child_weight=self.min_child_weight,
max_delta_step=self.max_delta_step,
subsample=self.subsample,
colsample_bytree=self.colsample_bytree,
colsample_bylevel=self.colsample_bylevel,
reg_alpha=self.reg_alpha,
reg_lambda=self.reg_lambda,
scale_pos_weight=self.scale_pos_weight,
base_score=self.base_score,
random_state=self.random_state,
# missing=self.missing,
**self.kwargs)
self.clf_.fit(self.X_train_new_, y)
self.decision_scores_ = self.clf_.predict_proba(
self.X_train_new_)[:, 1]
self.labels_ = self.clf_.predict(self.X_train_new_).ravel()
return self
[docs]
def decision_function(self, X):
check_is_fitted(self, ['clf_', 'decision_scores_',
'labels_', '_scalar'])
X = check_array(X)
# construct the new feature space
X_add = self._generate_new_features(X)
X_new = np.concatenate((X, X_add), axis=1)
pred_scores = self.clf_.predict_proba(X_new)[:, 1]
return pred_scores.ravel()
[docs]
def predict(self, X):
"""Predict if a particular sample is an outlier or not.
Calling xgboost `predict` function.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
check_is_fitted(self, ['clf_', 'decision_scores_',
'labels_', '_scalar'])
X = check_array(X)
# construct the new feature space
X_add = self._generate_new_features(X)
X_new = np.concatenate((X, X_add), axis=1)
pred_scores = self.clf_.predict(X_new)
return pred_scores.ravel()
[docs]
def predict_proba(self, X):
"""Predict the probability of a sample being outlier.
Calling xgboost `predict_proba` function.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""
return self.decision_function(X)
[docs]
def fit_predict(self, X, y):
self.fit(X, y)
return self.labels_
[docs]
def fit_predict_score(self, X, y, scoring='roc_auc_score'):
"""Fit the detector, predict on samples, and evaluate the model by
predefined metrics, e.g., ROC.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Not used, present for API consistency by convention.
scoring : str, optional (default='roc_auc_score')
Evaluation metric:
- 'roc_auc_score': ROC score
- 'prc_n_score': Precision @ rank n score
Returns
-------
score : float
"""
self.fit(X, y)
if scoring == 'roc_auc_score':
score = roc_auc_score(y, self.decision_scores_)
elif scoring == 'prc_n_score':
score = precision_n_scores(y, self.decision_scores_)
else:
raise NotImplementedError('PyOD built-in scoring only supports '
'ROC and Precision @ rank n')
print("{metric}: {score}".format(metric=scoring, score=score))
return score