Source code for pyod.models.pyg_radar

# -*- coding: utf-8 -*-
"""Radar: Residual Analysis for Anomaly Detection in Attributed Networks.

Models node attributes as X ≈ WX + R, where W is a learned n x n
weight matrix that predicts each node's attributes from other nodes'
attributes, and R is the residual. The adjacency enters only through
the graph Laplacian regularizer on R. Anomaly scores are the L2
norms of residual rows (nodes with large residuals are anomalous).

See :cite:`li2017radar` for details.

Reference:
    Li, J., Dani, H., Hu, X. and Liu, H., 2017. Radar: Residual Analysis
    for Anomaly Detection in Attributed Networks. In IJCAI, pp. 2152-2158.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause

import numpy as np
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ._pyg_utils import validate_graph_input, to_sparse_adj


[docs] class Radar(BaseDetector): """Radar: Residual Analysis for Anomaly Detection. Solves ``min_{W,R} ||X - WX - R||_F^2 + alpha * ||W||_F^2 + gamma * (||R||_{2,1} + tr(R^T L R))`` via alternating optimization. W is a learned n x n weight matrix (not the adjacency). The graph Laplacian L smooths the residual R. Anomaly score per node = L2 norm of residual row. This detector is **transductive**. Parameters ---------- alpha : float, default=1.0 Frobenius norm penalty on W (regularization). gamma : float, default=0.01 Residual regularization strength: controls both L21 row-sparsity and graph Laplacian smoothing on R. Scale-sensitive; typical range 0.001-0.1. max_iter : int, default=100 Number of alternating optimization iterations. contamination : float, default=0.1 Expected proportion of anomalies. Attributes ---------- decision_scores_ : numpy array of shape (n_nodes,) labels_ : numpy array of shape (n_nodes,) threshold_ : float """ def __init__(self, alpha=1.0, gamma=0.01, max_iter=100, contamination=0.1): super(Radar, self).__init__(contamination=contamination) self.alpha = alpha self.gamma = gamma self.max_iter = max_iter
[docs] def fit(self, X, y=None, edge_index=None): """Fit the detector on graph data. Parameters ---------- X : Data or array-like PyG Data or node features (n_nodes, n_features). y : ignored edge_index : array-like or None COO edge list. Required when X is numpy. Returns ------- self """ data = validate_graph_input(X, edge_index) n_nodes = data.num_nodes self._set_n_classes(y) if data.x is None: raise ValueError("Radar requires node features (data.x).") features = data.x.cpu().numpy().astype(np.float64) ei = data.edge_index.cpu().numpy() adj = to_sparse_adj(ei, n_nodes) scores = self._factorize(adj, features) self.decision_scores_ = scores self._process_decision_scores() return self
def _factorize(self, A, X): """Alternating optimization for Radar. Objective: min_{W,R} ||X - WX - R||^2 + alpha * ||W||_F^2 + gamma * ||R||_{2,1} W is n x n (learned, initialized to identity). A enters via Laplacian regularizer on R. Parameters ---------- A : scipy.sparse.csr_matrix of shape (n, n) X : np.ndarray of shape (n, d) Returns ------- scores : np.ndarray of shape (n,) """ n, d = X.shape # Graph Laplacian L = D - A (for residual smoothness) A_dense = A.toarray() degree = np.asarray(A.sum(axis=1)).ravel() L = np.diag(degree) - A_dense # Initialize W (n x n), predict each node from others W = np.eye(n, dtype=np.float64) R = np.zeros((n, d), dtype=np.float64) XXT = X @ X.T # (n x n), precompute for _ in range(self.max_iter): # Update W: dL/dW = -2(X-WX-R)X^T + 2*alpha*W = 0 # W(XX^T + alpha*I) = (X - R)X^T rhs = (X - R) @ X.T W = rhs @ np.linalg.inv( XXT + self.alpha * np.eye(n)) # Update R: Laplacian smoothing + L21 proximal P = X - W @ X # Graph-aware smoothing: (I + gamma*L)^{-1} P P_smooth = np.linalg.solve( np.eye(n) + self.gamma * L, P) # L21 shrinkage for row-sparsity row_norms = np.linalg.norm( P_smooth, axis=1, keepdims=True) row_norms = np.maximum(row_norms, 1e-10) shrink = np.maximum( 0.0, 1.0 - self.gamma / (2.0 * row_norms)) R = shrink * P_smooth scores = np.linalg.norm(R, axis=1) return scores
[docs] def decision_function(self, X): """Not supported (transductive detector).""" raise NotImplementedError( "Radar is a transductive detector. Use decision_scores_ " "after fit().")
[docs] def predict(self, X, return_confidence=False): """Not supported (transductive detector).""" raise NotImplementedError( "Radar is a transductive detector. Use labels_ after fit().")
[docs] def predict_proba(self, X, method="linear", return_confidence=False): """Not supported (transductive detector).""" raise NotImplementedError( "Radar is a transductive detector.")
[docs] def predict_confidence(self, X): """Not supported (transductive detector).""" raise NotImplementedError( "Radar is a transductive detector.")