# -*- coding: utf-8 -*-
"""CoLA: Contrastive Self-Supervised Learning for Anomaly Detection.
Contrasts each node's embedding against its local neighbor context
(mean of neighbors' embeddings). Nodes whose embeddings are
indistinguishable from shuffled-feature embeddings are anomalous.
Multi-round scoring for robustness.
See :cite:`liu2022cola` for details.
Reference:
Liu, Y., Li, Z., Pan, S., Gool, T., Xiang, T. and Gong, B., 2022.
Anomaly Detection on Attributed Networks via Contrastive
Self-Supervised Learning. In WWW, pp. 2137-2147.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
import numpy as np
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ._pyg_utils import validate_graph_input
[docs]
class CoLA(BaseDetector):
"""CoLA: Contrastive Anomaly Detection on Attributed Networks.
GCN encoder maps nodes to embeddings. A bilinear discriminator
scores how well a node's embedding matches its local neighbor
context (mean of neighbors' embeddings).
Nodes with low discriminator scores are anomalous.
This detector is **transductive**.
Parameters
----------
hidden_dim : int, default=64
Hidden dimension of GCN.
num_layers : int, default=2
Number of GCN layers.
epochs : int, default=100
Training epochs.
lr : float, default=1e-3
Learning rate.
contamination : float, default=0.1
Expected proportion of anomalies.
Attributes
----------
decision_scores_ : numpy array of shape (n_nodes,)
labels_ : numpy array of shape (n_nodes,)
threshold_ : float
"""
def __init__(self, hidden_dim=64, num_layers=2, epochs=100,
lr=1e-3, contamination=0.1):
super(CoLA, self).__init__(contamination=contamination)
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.epochs = epochs
self.lr = lr
[docs]
def fit(self, X, y=None, edge_index=None):
"""Fit the detector on graph data.
Parameters
----------
X : Data or array-like
y : ignored
edge_index : array-like or None
Returns
-------
self
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
data = validate_graph_input(X, edge_index)
n_nodes = data.num_nodes
self._set_n_classes(y)
if data.x is None:
raise ValueError("CoLA requires node features (data.x).")
in_dim = data.x.shape[1]
model = _CoLAModel(in_dim, self.hidden_dim, self.num_layers)
optimizer = torch.optim.Adam(model.parameters(), lr=self.lr)
x = data.x
ei = data.edge_index
# Sparse row-normalized adjacency for local context
from torch_geometric.utils import degree
row_deg = degree(ei[0], num_nodes=n_nodes)
row_deg = row_deg.clamp(min=1)
edge_weight = 1.0 / row_deg[ei[0]]
adj_norm = torch.sparse_coo_tensor(
ei, edge_weight, (n_nodes, n_nodes)).coalesce()
model.train()
for epoch in range(self.epochs):
z = model.encode(x, ei)
# Local context: mean of neighbors' embeddings
local_ctx = torch.sparse.mm(adj_norm, z) # (n, hid)
# Positive: (node, local_context) pairs
pos_scores = model.discriminate(z, local_ctx)
# Negative: shuffle features, re-encode
perm = torch.randperm(n_nodes)
z_neg = model.encode(x[perm], ei)
neg_scores = model.discriminate(z_neg, local_ctx)
pos_loss = F.binary_cross_entropy_with_logits(
pos_scores, torch.ones(n_nodes))
neg_loss = F.binary_cross_entropy_with_logits(
neg_scores, torch.zeros(n_nodes))
loss = pos_loss + neg_loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Multi-round scoring with dropout stochasticity
model.train() # keep dropout active for stochasticity
all_scores = []
for _ in range(5):
with torch.no_grad():
z = model.encode(x, ei)
local_ctx = torch.sparse.mm(adj_norm, z)
s = -model.discriminate(z, local_ctx)
all_scores.append(s.cpu().numpy())
model.eval()
scores = torch.FloatTensor(np.mean(all_scores, axis=0))
self.decision_scores_ = scores.cpu().numpy()
self._process_decision_scores()
return self
[docs]
def decision_function(self, X):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"CoLA is a transductive detector. Use decision_scores_ "
"after fit().")
[docs]
def predict(self, X, return_confidence=False):
"""Not supported (transductive detector)."""
raise NotImplementedError(
"CoLA is a transductive detector. Use labels_ after fit().")
[docs]
def predict_proba(self, X, method="linear", return_confidence=False):
"""Not supported (transductive detector)."""
raise NotImplementedError("CoLA is a transductive detector.")
[docs]
def predict_confidence(self, X):
"""Not supported (transductive detector)."""
raise NotImplementedError("CoLA is a transductive detector.")
def _CoLAModel(in_dim, hid_dim, num_layers):
"""Factory: returns a torch.nn.Module for CoLA.
Uses local-context contrastive learning: a GCN encoder
produces node embeddings, and a bilinear discriminator
scores (node, local_neighbor_context) pairs.
"""
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv
class _Model(nn.Module):
def __init__(self):
super().__init__()
self.convs = nn.ModuleList()
self.convs.append(GCNConv(in_dim, hid_dim))
for _ in range(num_layers - 1):
self.convs.append(GCNConv(hid_dim, hid_dim))
self.drop = nn.Dropout(0.3)
self.disc = nn.Bilinear(hid_dim, hid_dim, 1)
def encode(self, x, edge_index):
z = x
for i, conv in enumerate(self.convs):
z = conv(z, edge_index)
if i < len(self.convs) - 1:
z = torch.relu(z)
z = self.drop(z)
return z
def discriminate(self, z, local_ctx):
"""Score (node_embedding, local_context) pairs."""
return self.disc(z, local_ctx).squeeze(-1)
return _Model()