# -*- coding: utf-8 -*-
"""GUIDE: Higher-order Structure Based Anomaly Detection.
Dual GCN autoencoders: one on the original adjacency, one on a
motif (triangle) adjacency. Anomaly score = weighted sum of
reconstruction errors from both views.
See :cite:`yuan2021guide` for details.
Reference:
Yuan, X., Zhou, N., Yu, S., Huang, H., Chen, Z. and Xia, F.,
2021. Higher-order Structure Based Anomaly Detection on Attributed
Networks. In IEEE BigData, pp. 2691-2700.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
import numpy as np
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ._pyg_utils import validate_graph_input, to_sparse_adj
[docs]
class GUIDE(BaseDetector):
"""GUIDE: Higher-order Structure Based Anomaly Detection.
Constructs a motif adjacency from triangle participation
(binarized in v1: edges in at least one triangle) and runs
two GCN autoencoders in parallel. Score = ``alpha * err_orig
+ (1 - alpha) * err_motif``.
This detector is **transductive**.
Parameters
----------
hidden_dim : int, default=64
Hidden dimension of GCN layers.
num_layers : int, default=2
Number of GCN encoder layers.
alpha : float, default=0.5
Weight for original-graph reconstruction error.
dropout : float, default=0.3
Dropout rate.
epochs : int, default=100
Training epochs.
lr : float, default=5e-3
Learning rate.
contamination : float, default=0.1
Expected proportion of anomalies.
Attributes
----------
decision_scores_ : numpy array of shape (n_nodes,)
labels_ : numpy array of shape (n_nodes,)
threshold_ : float
"""
def __init__(self, hidden_dim=64, num_layers=2, alpha=0.5,
dropout=0.3, epochs=100, lr=5e-3,
contamination=0.1):
super(GUIDE, self).__init__(contamination=contamination)
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.alpha = alpha
self.dropout = dropout
self.epochs = epochs
self.lr = lr
[docs]
def fit(self, X, y=None, edge_index=None):
"""Fit the detector on graph data.
Parameters
----------
X : Data or array-like
y : ignored
edge_index : array-like or None
Returns
-------
self
"""
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv
data = validate_graph_input(X, edge_index)
n_nodes = data.num_nodes
self._set_n_classes(y)
if data.x is None:
raise ValueError("GUIDE requires node features (data.x).")
in_dim = data.x.shape[1]
ei = data.edge_index
ei_np = ei.cpu().numpy()
# Build motif adjacency (triangle counts per edge)
adj_sp = to_sparse_adj(ei_np, n_nodes)
motif_adj = adj_sp.dot(adj_sp).multiply(adj_sp)
motif_coo = motif_adj.tocoo()
if motif_coo.nnz == 0:
raise ValueError(
"GUIDE requires higher-order structures (triangles) "
"in the graph. This graph has no triangles. Use "
"DOMINANT or CoLA instead.")
ei_motif = torch.LongTensor(
np.array([motif_coo.row, motif_coo.col]))
x = data.x
# Dense adjacencies for loss
adj_dense = torch.zeros(n_nodes, n_nodes)
adj_dense[ei[0], ei[1]] = 1.0
motif_dense = torch.zeros(n_nodes, n_nodes)
motif_dense[ei_motif[0], ei_motif[1]] = 1.0
model = _GUIDEModel(
in_dim, self.hidden_dim, self.num_layers, self.dropout)
optimizer = torch.optim.Adam(model.parameters(), lr=self.lr)
model.train()
for epoch in range(self.epochs):
x_hat_o, x_hat_m, a_hat_o, a_hat_m = model(
x, ei, ei_motif)
# Original graph errors
s_err_o = torch.sum(
(adj_dense - a_hat_o) ** 2, dim=1)
a_err_o = torch.sum((x - x_hat_o) ** 2, dim=1)
err_orig = s_err_o + a_err_o
# Motif graph errors
s_err_m = torch.sum(
(motif_dense - a_hat_m) ** 2, dim=1)
a_err_m = torch.sum((x - x_hat_m) ** 2, dim=1)
err_motif = s_err_m + a_err_m
loss = torch.mean(
self.alpha * err_orig
+ (1 - self.alpha) * err_motif)
optimizer.zero_grad()
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
x_hat_o, x_hat_m, a_hat_o, a_hat_m = model(
x, ei, ei_motif)
s_err_o = torch.sum(
(adj_dense - a_hat_o) ** 2, dim=1)
a_err_o = torch.sum((x - x_hat_o) ** 2, dim=1)
err_orig = s_err_o + a_err_o
s_err_m = torch.sum(
(motif_dense - a_hat_m) ** 2, dim=1)
a_err_m = torch.sum((x - x_hat_m) ** 2, dim=1)
err_motif = s_err_m + a_err_m
scores = (self.alpha * err_orig
+ (1 - self.alpha) * err_motif)
self.decision_scores_ = scores.cpu().numpy()
self._process_decision_scores()
return self
[docs]
def decision_function(self, X):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"GUIDE is a transductive detector. Use decision_scores_ "
"after fit().")
[docs]
def predict(self, X, return_confidence=False):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"GUIDE is a transductive detector. Use labels_ "
"after fit().")
[docs]
def predict_proba(self, X, method="linear", return_confidence=False):
"""Not supported (transductive detector)."""
raise NotImplementedError("GUIDE is a transductive detector.")
[docs]
def predict_confidence(self, X):
"""Not supported (transductive detector)."""
raise NotImplementedError("GUIDE is a transductive detector.")
def _GUIDEModel(in_dim, hid_dim, num_layers, dropout):
"""Factory: returns torch.nn.Module for GUIDE dual AE."""
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv
class _Model(nn.Module):
def __init__(self):
super().__init__()
# Original-graph encoder
self.enc_orig = nn.ModuleList()
self.enc_orig.append(GCNConv(in_dim, hid_dim))
for _ in range(num_layers - 1):
self.enc_orig.append(GCNConv(hid_dim, hid_dim))
# Motif-graph encoder
self.enc_motif = nn.ModuleList()
self.enc_motif.append(GCNConv(in_dim, hid_dim))
for _ in range(num_layers - 1):
self.enc_motif.append(GCNConv(hid_dim, hid_dim))
self.dec_attr_orig = nn.Linear(hid_dim, in_dim)
self.dec_attr_motif = nn.Linear(hid_dim, in_dim)
self._dropout = dropout
def _encode(self, x, edge_index, encoder):
z = x
for i, conv in enumerate(encoder):
z = conv(z, edge_index)
if i < len(encoder) - 1:
z = torch.relu(z)
z = torch.dropout(
z, p=self._dropout, train=self.training)
return z
def forward(self, x, ei_orig, ei_motif):
z_o = self._encode(x, ei_orig, self.enc_orig)
z_m = self._encode(x, ei_motif, self.enc_motif)
x_hat_o = self.dec_attr_orig(z_o)
x_hat_m = self.dec_attr_motif(z_m)
a_hat_o = torch.sigmoid(z_o @ z_o.t())
a_hat_m = torch.sigmoid(z_m @ z_m.t())
return x_hat_o, x_hat_m, a_hat_o, a_hat_m
return _Model()