# -*- coding: utf-8 -*-
"""LUNAR: Unifying Local Outlier Detection Methods via Graph Neural Networks
"""
# Author: Adam Goodge <a.goodge@u.nus.edu>
#
from copy import deepcopy
import numpy as np
try:
import torch
except ImportError:
print('please install torch first')
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import check_random_state
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
# negative samples for training
def generate_negative_samples(x, sample_type, proportion, epsilon,
random_state=None):
random_state = check_random_state(random_state)
n_samples = int(proportion * (len(x)))
n_dim = x.shape[-1]
# uniform samples in range [x.min(),x.max()]
rand_unif = x.min() + (x.max() - x.min()) * random_state.rand(
n_samples, n_dim).astype('float32')
# subspace perturbation samples
x_temp = x[random_state.choice(np.arange(len(x)), size=n_samples)]
randmat = random_state.rand(n_samples, n_dim) < 0.3
rand_sub = x_temp + randmat * (
epsilon * random_state.randn(n_samples, n_dim)).astype('float32')
if sample_type == 'UNIFORM':
neg_x = rand_unif
if sample_type == 'SUBSPACE':
neg_x = rand_sub
if sample_type == 'MIXED':
# randomly sample from uniform and gaussian negative samples
neg_x = np.concatenate((rand_unif, rand_sub), 0)
neg_x = neg_x[random_state.choice(np.arange(len(neg_x)),
size=n_samples)]
neg_y = np.ones(len(neg_x))
return neg_x.astype('float32'), neg_y.astype('float32')
class SCORE_MODEL(nn.Module):
def __init__(self, k):
super(SCORE_MODEL, self).__init__()
self.hidden_size = 256
self.network = nn.Sequential(
nn.Linear(k, self.hidden_size),
nn.Tanh(),
nn.Linear(self.hidden_size, self.hidden_size),
nn.Tanh(),
nn.Linear(self.hidden_size, self.hidden_size),
nn.Tanh(),
nn.Linear(self.hidden_size, 1),
nn.Sigmoid()
)
def forward(self, x):
out = self.network(x)
out = torch.squeeze(out, 1)
return out
class WEIGHT_MODEL(nn.Module):
def __init__(self, k):
super(WEIGHT_MODEL, self).__init__()
self.hidden_size = 256
self.network = nn.Sequential(
nn.Linear(k, self.hidden_size),
nn.ReLU(),
nn.Linear(self.hidden_size, self.hidden_size),
nn.ReLU(),
nn.Linear(self.hidden_size, self.hidden_size),
nn.ReLU(),
nn.LayerNorm(self.hidden_size),
nn.Linear(self.hidden_size, k),
)
self.final_norm = nn.BatchNorm1d(1)
def forward(self, x):
alpha = self.network(x)
# get weights > 0 and sum to 1.0
alpha = F.softmax(alpha, dim=1)
# multiply weights by each distance in input vector
out = torch.sum(alpha * x, dim=1, keepdim=True)
# batch norm
out = self.final_norm(out)
out = torch.squeeze(out, 1)
return out
[docs]
class LUNAR(BaseDetector):
"""
LUNAR class for outlier detection. See https://www.aaai.org/AAAI22Papers/AAAI-51.GoodgeA.pdf for details.
For an observation, its ordered list of distances to its k nearest neighbours is input to a neural network,
with one of the following outputs:
1) SCORE_MODEL: network directly outputs the anomaly score.
2) WEIGHT_MODEL: network outputs a set of weights for the k distances, the anomaly score is then the
sum of weighted distances.
See :cite:`goodge2022lunar` for details.
Parameters
----------
model_type: str in ['WEIGHT', 'SCORE'], optional (default = 'WEIGHT')
Whether to use WEIGHT_MODEL or SCORE_MODEL for anomaly scoring.
n_neighbors: int, optional (default = 5)
Number of neighbors to use by default for k neighbors queries.
In the implementation, the constructor argument name is
``n_neighbours`` for backward compatibility.
negative_sampling: str in ['UNIFORM', 'SUBSPACE', MIXED'], optional (default = 'MIXED)
Type of negative samples to use between:
- 'UNIFORM': uniformly distributed samples
- 'SUBSPACE': subspace perturbation (additive random noise in a subset of features)
- 'MIXED': a combination of both types of samples
val_size: float in [0,1], optional (default = 0.1)
Proportion of samples to be used for model validation
scaler: scikit-learn-style scaler instance, optional (default=None)
Method of data normalization. If ``None``, a fresh
``MinMaxScaler()`` is created per ``fit`` call. Pass a configured
scaler (e.g. ``StandardScaler()``) to override; the supplied
instance is deep-copied internally so each LUNAR instance has its
own state and detectors created with the default never share a
scaler. Pass ``False`` to disable scaling entirely.
epsilon: float, optional (default = 0.1)
Hyper-parameter for the generation of negative samples.
A smaller epsilon results in negative samples more similar to normal samples.
proportion: float, optional (default = 1.0)
Hyper-parameter for the proprotion of negative samples to use relative to the
number of normal training samples.
n_epochs: int, optional (default = 200)
Number of epochs to train neural network.
lr: float, optional (default = 0.001)
Learning rate.
wd: float, optional (default = 0.1)
Weight decay.
verbose: int in {0,1}, optional (default = 0):
To view or hide training progress
algorithm : {'auto', 'ball_tree', 'kd_tree', 'brute'}, optional
Algorithm used to compute nearest neighbors.
leaf_size : int, optional (default=30)
Leaf size passed to nearest-neighbor tree backends when applicable.
This can affect construction/query speed and memory usage.
metric : str or callable, optional (default='minkowski')
Distance metric used for nearest-neighbor computation.
p : int, optional (default=2)
Power parameter for the Minkowski metric when
``metric='minkowski'``.
metric_params : dict, optional (default=None)
Additional keyword arguments for the metric function.
n_jobs : int, optional (default=1)
Number of parallel jobs for nearest-neighbor search.
If ``-1``, all available CPU cores are used.
random_state : int, RandomState instance or None, optional (default=None)
Controls all stochastic parts of LUNAR: ``torch`` network
initialization (and CUDA seeding when GPUs are present) before
``SCORE_MODEL`` / ``WEIGHT_MODEL`` is built, the SGD path through
a re-seeded ``torch.manual_seed`` in ``fit()``, the
``train_test_split`` validation split, and the
``generate_negative_samples`` synthetic anomaly generator. If
int, it is used as the seed. If ``RandomState`` instance, that
generator is used and advanced (sklearn convention). If ``None``,
randomness follows the library default (numpy module state).
When ``ADEngine(random_state=...)`` builds a LUNAR plan, the
engine seed flows here automatically.
Attributes
----------
"""
def __init__(self, model_type="WEIGHT", n_neighbours=5,
negative_sampling="MIXED",
val_size=0.1, scaler=None, epsilon=0.1,
proportion=1.0,
n_epochs=200, lr=0.001, wd=0.1, verbose=0, contamination=0.1,
algorithm='auto', leaf_size=30, metric='minkowski', p=2,
metric_params=None, n_jobs=1, random_state=None):
super(LUNAR, self).__init__(contamination=contamination)
self.model_type = model_type
self.n_neighbours = n_neighbours
self.negative_sampling = negative_sampling
self.epsilon = epsilon
self.proportion = proportion
self.n_epochs = n_epochs
# Issue #502: store the user-supplied scaler verbatim so that
# sklearn.base.clone() round-trips correctly (clone re-checks that
# __init__ does not mutate its params). Materialisation/deep-copy
# is deferred to fit() — see _resolve_scaler — which guarantees
# each detector instance owns an independent scaler object.
self.scaler = scaler
self.lr = lr
self.wd = wd
self.val_size = val_size
self.verbose = verbose
self.algorithm = algorithm
self.leaf_size = leaf_size
self.metric = metric
self.p = p
self.metric_params = metric_params
self.n_jobs = n_jobs
self.random_state = random_state
self.device = torch.device(
'cuda' if torch.cuda.is_available() else 'cpu')
# Seed torch BEFORE building the network so the random initial
# weights of SCORE_MODEL / WEIGHT_MODEL are reproducible. The same
# seed is set again in fit() to cover the SGD / shuffling path.
# Accept either int or numpy RandomState (sklearn convention).
# check_random_state normalizes both into a RandomState; we draw
# a single int from it to pass to torch.manual_seed, which only
# accepts ints.
if random_state is not None:
torch_seed = int(
check_random_state(random_state).randint(
0, np.iinfo(np.int32).max))
torch.manual_seed(torch_seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(torch_seed)
if model_type == "SCORE":
self.network = SCORE_MODEL(n_neighbours).to(self.device)
elif model_type == "WEIGHT":
self.network = WEIGHT_MODEL(n_neighbours).to(self.device)
def _resolve_scaler(self):
"""Materialise the runtime scaler for this fit call.
- ``self.scaler is None`` (the new default) -> a fresh
``MinMaxScaler()`` so each fit owns its own state.
- ``self.scaler is False`` -> disable scaling entirely.
- any other instance -> deep-copy so two LUNAR instances that
were configured with the same scaler object don't end up
sharing parameters after their respective ``fit`` calls.
"""
if self.scaler is None:
return MinMaxScaler()
if self.scaler is False:
return None
return deepcopy(self.scaler)
[docs]
def fit(self, X, y=None):
"""Fit detector. y is assumed to be 0 for all training samples.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Overwritten with 0 for all training samples (assumed to be normal).
Returns
-------
self : object
Fitted estimator.
"""
# X = check_array(X)
self._set_n_classes(y)
X = X.astype('float32')
y = np.zeros(len(X))
# Seed torch when the user pinned random_state, so the SGD /
# shuffle path is reproducible. The numpy random stream lives on
# the rng returned by check_random_state below and is threaded
# through train_test_split and generate_negative_samples.
# Accept int or numpy RandomState; check_random_state normalizes.
if self.random_state is not None:
torch_seed = int(
check_random_state(self.random_state).randint(
0, np.iinfo(np.int32).max))
torch.manual_seed(torch_seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(torch_seed)
rng = check_random_state(self.random_state)
# split training and validation sets
train_x, val_x, train_y, val_y = train_test_split(
X, y, test_size=self.val_size, random_state=rng)
# Materialise a fresh, instance-private scaler before fitting it.
# ``self.scaler`` (the constructor argument) is left untouched so
# that sklearn.base.clone() can read it back unchanged. The fitted
# scaler lives on ``self.scaler_`` (sklearn convention for fitted
# attributes). See issue #502.
self.scaler_ = self._resolve_scaler()
# fit data scaler to the training set if scaler has been passed
if self.scaler_ is None:
pass
else:
self.scaler_.fit(train_x)
# scale data if scaler has been passed
if self.scaler_ is None:
pass
else:
train_x = self.scaler_.transform(train_x)
val_x = self.scaler_.transform(val_x)
# generate negative samples for training and validation set seperately
neg_train_x, neg_train_y = generate_negative_samples(
train_x, self.negative_sampling, self.proportion,
self.epsilon, random_state=rng)
neg_val_x, neg_val_y = generate_negative_samples(
val_x, self.negative_sampling, self.proportion,
self.epsilon, random_state=rng)
train_x = np.vstack((train_x, neg_train_x))
train_y = np.hstack((train_y, neg_train_y))
val_x = np.vstack((val_x, neg_val_x))
val_y = np.hstack((val_y, neg_val_y))
self.neigh = NearestNeighbors(n_neighbors=self.n_neighbours + 1,
algorithm=self.algorithm,
leaf_size=self.leaf_size,
metric=self.metric,
p=self.p,
metric_params=self.metric_params,
n_jobs=self.n_jobs)
self.neigh.fit(train_x)
# nearest neighbours of training set
train_dist, _ = self.neigh.kneighbors(train_x[train_y == 0],
n_neighbors=self.n_neighbours + 1)
neg_train_dist, _ = self.neigh.kneighbors(train_x[train_y == 1],
n_neighbors=self.n_neighbours)
# remove self loops of normal training points
train_dist = np.vstack((train_dist[:, 1:], neg_train_dist))
# nearest neighbours of validation set
val_dist, _ = self.neigh.kneighbors(val_x,
n_neighbors=self.n_neighbours)
train_dist = torch.tensor(train_dist, dtype=torch.float32).to(
self.device)
train_y = torch.tensor(train_y, dtype=torch.float32).to(self.device)
val_dist = torch.tensor(val_dist, dtype=torch.float32).to(self.device)
val_y = torch.tensor(val_y, dtype=torch.float32).to(self.device)
# loss function
criterion = nn.MSELoss(reduction='none')
# optimizer
optimizer = optim.Adam(self.network.parameters(), lr=self.lr,
weight_decay=self.wd)
# for early stopping
best_val_score = 0
# model training
for epoch in range(self.n_epochs):
# see performance of model before epoch
with torch.no_grad():
self.network.eval()
out = self.network(train_dist)
train_score = roc_auc_score(train_y.cpu(), out.cpu())
out = self.network(val_dist)
val_score = roc_auc_score(val_y.cpu(), out.cpu())
# save best model
if val_score >= best_val_score:
best_dict = {'epoch': epoch,
'model_state_dict': deepcopy(
self.network.state_dict()),
'optimizer_state_dict': deepcopy(
optimizer.state_dict()),
'train_score': train_score,
'val_score': val_score,
}
# reset current best score
best_val_score = val_score
if self.verbose == 1:
print(
f"Epoch {epoch} \t Train Score {np.round(train_score, 6)} \t Val Score {np.round(val_score, 6)}")
# training loop
self.network.train()
optimizer.zero_grad()
out = self.network(train_dist)
loss = criterion(out, train_y).sum()
loss.backward()
optimizer.step()
# print best model after training
if self.verbose == 1:
print(
f"Finished training...\nBest Model: Epoch {best_dict['epoch']} \t Train Score {best_dict['train_score']} \t Val Score {best_dict['val_score']}")
# load best model after training
self.network.load_state_dict(best_dict['model_state_dict'])
# Determine outlier scores for train set
# scale data if scaler has been passed (use the fitted scaler_)
if self.scaler_ is None:
X_norm = np.copy(X)
else:
X_norm = self.scaler_.transform(X)
# nearest neighbour search
dist, _ = self.neigh.kneighbors(X_norm, self.n_neighbours)
dist = torch.tensor(dist, dtype=torch.float32).to(self.device)
# forward pass
with torch.no_grad():
self.network.eval()
anomaly_scores = self.network(dist)
self.decision_scores_ = anomaly_scores.cpu().detach().numpy().ravel()
self._process_decision_scores()
return self
[docs]
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.
For consistency, outliers are assigned with larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
check_is_fitted(self, ['decision_scores_'])
# X = check_array(X)
X = X.astype('float32')
# scale data (use the fitted scaler_)
if self.scaler_ is None:
pass
else:
X = self.scaler_.transform(X)
# nearest neighbour search
dist, _ = self.neigh.kneighbors(X, self.n_neighbours)
dist = torch.tensor(dist, dtype=torch.float32).to(self.device)
# forward pass
with torch.no_grad():
self.network.eval()
anomaly_scores = self.network(dist)
scores = anomaly_scores.cpu().detach().numpy().ravel()
return scores