# -*- coding: utf-8 -*-
"""ANOMALOUS: A Joint Modeling Approach for Anomaly Detection.
CUR-decomposition-based method that models attributes as
``X ≈ XWX + R``, where W (d x n) jointly selects representative
attributes and nodes. The residual R captures anomalies via L21
sparsity, regularized by the graph Laplacian for network coherence.
See :cite:`peng2018anomalous` for details.
Reference:
Peng, Z., Luo, M., Li, J., Liu, H. and Zheng, Q., 2018.
ANOMALOUS: A Joint Modeling Approach for Anomaly Detection on
Attributed Networks. In IJCAI, pp. 3529-3535.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
import numpy as np
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ._pyg_utils import validate_graph_input, to_sparse_adj
[docs]
class ANOMALOUS(BaseDetector):
"""ANOMALOUS: Joint Modeling for Anomaly Detection.
CUR-style decomposition: ``min_{W,R} ||X - XWX - R||_F^2
+ alpha * ||W||_F^2 + lambda_r * ||R||_{2,1} + gamma *
tr(R^T L R)`` where W is d x n, L is the graph Laplacian.
Anomaly score = L2 norm of each residual row.
This detector is **transductive**.
Parameters
----------
alpha : float, default=1.0
Frobenius norm penalty on W (regularization).
gamma : float, default=1.0
Graph Laplacian smoothing strength on R.
lambda_r : float, default=0.01
L21-norm penalty on residual R (row-sparsity).
Scale-sensitive; typical range 0.001-0.1.
max_iter : int, default=100
Number of alternating optimization iterations.
contamination : float, default=0.1
Expected proportion of anomalies.
Attributes
----------
decision_scores_ : numpy array of shape (n_nodes,)
labels_ : numpy array of shape (n_nodes,)
threshold_ : float
"""
def __init__(self, alpha=1.0, gamma=1.0, lambda_r=0.01,
max_iter=100, contamination=0.1):
super(ANOMALOUS, self).__init__(contamination=contamination)
self.alpha = alpha
self.gamma = gamma
self.lambda_r = lambda_r
self.max_iter = max_iter
[docs]
def fit(self, X, y=None, edge_index=None):
"""Fit the detector on graph data.
Parameters
----------
X : Data or array-like
PyG Data or node features.
y : ignored
edge_index : array-like or None
Returns
-------
self
"""
data = validate_graph_input(X, edge_index)
n_nodes = data.num_nodes
self._set_n_classes(y)
if data.x is None:
raise ValueError(
"ANOMALOUS requires node features (data.x).")
features = data.x.cpu().numpy().astype(np.float64)
ei = data.edge_index.cpu().numpy()
adj = to_sparse_adj(ei, n_nodes)
scores = self._factorize(adj, features)
self.decision_scores_ = scores
self._process_decision_scores()
return self
def _factorize(self, A, X):
"""Alternating optimization for ANOMALOUS.
Objective: min_{W,R} ||X - XWX - R||^2 + alpha * ||W||_F^2
+ lambda_r * ||R||_{2,1} + gamma * tr(R^T L R)
W is d x n (CUR-style). X selects columns, W mixes,
X selects rows.
Parameters
----------
A : scipy.sparse.csr_matrix of shape (n, n)
X : np.ndarray of shape (n, d)
Returns
-------
scores : np.ndarray of shape (n,)
"""
n, d = X.shape
A_dense = A.toarray()
# Graph Laplacian: L = D - A
degree = np.asarray(A.sum(axis=1)).ravel()
L = np.diag(degree) - A_dense
# Normalize features for numerical stability
feat_norms = np.linalg.norm(X, axis=0, keepdims=True)
feat_norms = np.maximum(feat_norms, 1e-10)
X_norm = X / feat_norms
# Initialize W (d x n) and R (n x d)
W = np.zeros((d, n), dtype=np.float64)
R = np.zeros((n, d), dtype=np.float64)
lr = 0.001
for _ in range(self.max_iter):
# Forward: XWX is (n,d)@(d,n)@(n,d) = (n,d)
XWX = X_norm @ W @ X_norm
residual = X_norm - XWX - R
# Update W via gradient descent with clipping
grad_W = (-2.0 * X_norm.T @ residual @ X_norm.T
+ 2.0 * self.alpha * W)
grad_norm = np.linalg.norm(grad_W)
if grad_norm > 1.0:
grad_W = grad_W / grad_norm
W -= lr * grad_W
# Update R: Laplacian smoothing + L21 proximal
P = X_norm - X_norm @ W @ X_norm
# Graph-aware smoothing: (I + gamma*L)^{-1} P
P_smooth = np.linalg.solve(
np.eye(n) + self.gamma * L, P)
# L21 shrinkage for row-sparsity
row_norms = np.linalg.norm(
P_smooth, axis=1, keepdims=True)
row_norms = np.maximum(row_norms, 1e-10)
shrink = np.maximum(
0.0,
1.0 - self.lambda_r / (2.0 * row_norms))
R = shrink * P_smooth
scores = np.linalg.norm(R, axis=1)
return scores
[docs]
def decision_function(self, X):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"ANOMALOUS is a transductive detector. Use "
"decision_scores_ after fit().")
[docs]
def predict(self, X, return_confidence=False):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"ANOMALOUS is a transductive detector. Use labels_ "
"after fit().")
[docs]
def predict_proba(self, X, method="linear", return_confidence=False):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"ANOMALOUS is a transductive detector.")
[docs]
def predict_confidence(self, X):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"ANOMALOUS is a transductive detector.")