Source code for pyod.models.ts_sand

# -*- coding: utf-8 -*-
"""SAND: Streaming Anomaly detection with Normalization and Drift adaptation.

Simplified PyOD adaptation of:
Boniol, P., Paparrizos, J., Palpanas, T. and Franklin, M.J., 2021.
SAND: Streaming subsequence anomaly detection.
*Proceedings of the VLDB Endowment*, 14(10), pp. 1717-1729.

This implementation extracts z-normalized sliding-window subsequences,
initializes k centroids using k-Shape on the first batch, then scores
remaining subsequences by SBD (shape-based distance) to their nearest
centroid.  Centroids are updated every ``batch_size`` subsequences via
exponential moving average with parameter ``alpha``.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause

import numpy as np
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ._ts_utils import (validate_ts_input, sliding_windows,
                         map_scores_to_timestamps, aggregate_channel_scores)
from .ts_kshape import _znormalize, _sbd, _kshape


[docs] class SAND(BaseDetector): """SAND streaming anomaly detector for time series. Extracts z-normalized sliding-window subsequences, initializes k centroids via k-Shape on the first batch, and scores each subsequence by its SBD to the nearest centroid. Centroids are updated every ``batch_size`` subsequences using an exponential moving average, enabling drift adaptation. Parameters ---------- window_size : int, optional (default=50) Subsequence length for the sliding window. n_clusters : int, optional (default=5) Number of centroids for k-Shape clustering. alpha : float, optional (default=0.5) Smoothing factor for centroid updates. ``new = alpha * batch_centroid + (1 - alpha) * old_centroid``. batch_size : int, optional (default=100) Number of subsequences between centroid updates. max_iter : int, optional (default=50) Maximum iterations for initial k-Shape clustering. contamination : float, optional (default=0.1) Expected proportion of outliers. Must be in (0, 0.5]. channel_aggregation : str, optional (default='max') How to aggregate per-channel scores for multivariate input. One of ``'max'`` or ``'mean'``. random_state : int or None, optional (default=42) Random seed for reproducible centroid initialization. Attributes ---------- decision_scores_ : numpy array of shape (n_timestamps,) Outlier scores of the training data. Higher is more abnormal. threshold_ : float Score threshold derived from ``contamination``. labels_ : numpy array of shape (n_timestamps,) Binary labels (0: inlier, 1: outlier). centroids_ : list of numpy arrays Per-channel centroids after processing all training data. Examples -------- >>> from pyod.models.ts_sand import SAND >>> import numpy as np >>> X_train = np.random.randn(500) >>> clf = SAND(n_clusters=3, window_size=20, contamination=0.1) >>> clf.fit(X_train) >>> scores = clf.decision_function(np.random.randn(200)) References ---------- .. [1] Boniol, P., Paparrizos, J., Palpanas, T. and Franklin, M.J., 2021. SAND: Streaming subsequence anomaly detection. *Proceedings of the VLDB Endowment*, 14(10), pp. 1717-1729. """ def __init__(self, window_size=50, n_clusters=5, alpha=0.5, batch_size=100, max_iter=50, contamination=0.1, channel_aggregation='max', random_state=42): super(SAND, self).__init__(contamination=contamination) self.window_size = window_size self.n_clusters = n_clusters self.alpha = alpha self.batch_size = batch_size self.max_iter = max_iter self.channel_aggregation = channel_aggregation self.random_state = random_state def _get_min_length(self): """Minimum time series length required.""" return self.window_size + 1 @staticmethod def _znorm_windows(windows): """Z-normalize each row of a window matrix.""" return np.array([_znormalize(w) for w in windows]) @staticmethod def _score_subsequences(subs, centroids): """Score each subsequence by SBD to nearest centroid. Parameters ---------- subs : np.ndarray of shape (n_subs, length) centroids : np.ndarray of shape (n_clusters, length) Returns ------- scores : np.ndarray of shape (n_subs,) """ scores = np.empty(len(subs)) for i, s in enumerate(subs): best_dist = np.inf for c in centroids: d, _ = _sbd(s, c) if d < best_dist: best_dist = d scores[i] = best_dist return scores def _streaming_fit_channel(self, windows): """Fit on one channel: initialize centroids, stream with updates. Parameters ---------- windows : np.ndarray of shape (n_windows, window_size) Z-normalized sliding windows for one channel. Returns ------- scores : np.ndarray of shape (n_windows,) centroids : np.ndarray of shape (n_clusters, window_size) """ n = len(windows) batch_size = min(self.batch_size, n) rng = np.random.RandomState(self.random_state) # Initialize centroids on the first batch using k-Shape init_batch = windows[:batch_size] centroids, _, _ = _kshape(init_batch, self.n_clusters, max_iter=self.max_iter, random_state=rng) # Score all subsequences in a streaming fashion scores = np.empty(n) # Process in batches for centroid updates pos = 0 while pos < n: end = min(pos + batch_size, n) batch = windows[pos:end] # Score this batch against current centroids scores[pos:end] = self._score_subsequences(batch, centroids) # Update centroids if this is not the last batch if end < n and len(batch) >= self.n_clusters: batch_rng = np.random.RandomState(self.random_state) batch_centroids, _, _ = _kshape( batch, self.n_clusters, max_iter=self.max_iter, random_state=batch_rng) for k in range(self.n_clusters): updated = (self.alpha * batch_centroids[k] + (1.0 - self.alpha) * centroids[k]) centroids[k] = _znormalize(updated) pos = end return scores, centroids
[docs] def fit(self, X, y=None): """Fit the SAND detector on time series data. Parameters ---------- X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels) Training time series data. y : Ignored Not used, present for API consistency. Returns ------- self : object Fitted estimator. """ X = validate_ts_input(X) n_timestamps, n_channels = X.shape min_len = self._get_min_length() if n_timestamps < min_len: raise ValueError( "Time series length %d is shorter than minimum " "required length %d (window_size=%d)" % (n_timestamps, min_len, self.window_size)) n_windows = n_timestamps - self.window_size + 1 init_batch = min(self.batch_size, n_windows) if init_batch < self.n_clusters: raise ValueError( "Not enough subsequences in initial batch (%d) for " "n_clusters=%d. Need a longer series, larger batch_size, " "or fewer clusters." % (init_batch, self.n_clusters)) self._set_n_classes(y) # Store channel count for validation in decision_function self.n_channels_ = n_channels # Process each channel independently self.centroids_ = [] per_channel_results = [] for ch in range(n_channels): channel = X[:, ch].reshape(-1, 1) windows = sliding_windows(channel, self.window_size, step=1) windows = self._znorm_windows(windows) ch_window_scores, ch_centroids = self._streaming_fit_channel( windows) self.centroids_.append(ch_centroids) # Map window scores back to timestamps ts_scores, valid_mask = map_scores_to_timestamps( ch_window_scores, self.window_size, step=1, n_timestamps=n_timestamps, aggregation='max') per_channel_results.append((ts_scores, valid_mask)) # Aggregate channels if n_channels == 1: scores, valid_mask = per_channel_results[0] else: filled_scores = [] combined_valid = per_channel_results[0][1].copy() for ch_scores, ch_valid in per_channel_results: filled = ch_scores.copy() filled[~ch_valid] = 0.0 filled_scores.append(filled) combined_valid &= ch_valid scores = aggregate_channel_scores( filled_scores, method=self.channel_aggregation) valid_mask = combined_valid # Masked-score workflow: threshold from valid subset valid_scores = scores[valid_mask] self.decision_scores_ = valid_scores self._process_decision_scores() # Reconstruct full-length arrays full_scores = scores.copy() full_scores[~valid_mask] = self.threshold_ full_labels = (full_scores > self.threshold_).astype(int) self.decision_scores_ = full_scores self.labels_ = full_labels return self
[docs] def decision_function(self, X): """Predict raw anomaly scores for a test time series. Parameters ---------- X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels) Test time series data. Returns ------- anomaly_scores : numpy array of shape (n_timestamps,) Anomaly scores. Higher is more abnormal. """ check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_', 'centroids_']) X = validate_ts_input(X) n_timestamps, n_channels = X.shape if n_channels != self.n_channels_: raise ValueError( "Channel count mismatch: model fitted on %d channels, " "got %d" % (self.n_channels_, n_channels)) per_channel_results = [] for ch in range(n_channels): channel = X[:, ch].reshape(-1, 1) windows = sliding_windows(channel, self.window_size, step=1) windows = self._znorm_windows(windows) ch_window_scores = self._score_subsequences( windows, self.centroids_[ch]) ts_scores, valid_mask = map_scores_to_timestamps( ch_window_scores, self.window_size, step=1, n_timestamps=n_timestamps, aggregation='max') per_channel_results.append((ts_scores, valid_mask)) if n_channels == 1: scores, valid_mask = per_channel_results[0] else: filled_scores = [] combined_valid = per_channel_results[0][1].copy() for ch_scores, ch_valid in per_channel_results: filled = ch_scores.copy() filled[~ch_valid] = 0.0 filled_scores.append(filled) combined_valid &= ch_valid scores = aggregate_channel_scores( filled_scores, method=self.channel_aggregation) valid_mask = combined_valid scores[~valid_mask] = self.threshold_ return scores