Source code for pyod.models.ts_od

# -*- coding: utf-8 -*-
"""TimeSeriesOD: Windowed bridge that wraps any PyOD detector for
time series anomaly detection.

Creates sliding windows from a time series, runs any PyOD detector on
the resulting window matrix, and maps anomaly scores back to individual
timestamps.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause

import numpy as np
from sklearn.base import clone
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ._ts_utils import validate_ts_input, sliding_windows, map_scores_to_timestamps
from .embedding import _DETECTOR_SHORTCUTS, resolve_detector


[docs] class TimeSeriesOD(BaseDetector): """Windowed bridge that wraps any PyOD detector for time series anomaly detection. Takes a time series, creates sliding windows, runs any PyOD detector on the window matrix, and maps scores back to timestamps. Parameters ---------- detector : str or BaseDetector, optional (default='IForest') Any PyOD detector. String resolves to a default-configured instance via the shortcut registry. If a BaseDetector instance is passed, it will be cloned. window_size : int, optional (default=50) Size of the sliding window. step : int, optional (default=1) Step size between consecutive windows. score_aggregation : str, optional (default='max') How to aggregate window-level scores to timestamp-level scores. One of 'max' or 'mean'. contamination : float, optional (default=0.1) Expected proportion of outliers in the dataset. Must be in (0, 0.5]. Attributes ---------- decision_scores_ : numpy array of shape (n_timestamps,) Outlier scores of the training data. Higher is more abnormal. threshold_ : float Score threshold based on ``contamination``. labels_ : numpy array of shape (n_timestamps,) Binary labels of training data (0: inlier, 1: outlier). detector_ : BaseDetector The resolved and fitted inner detector instance. Examples -------- >>> from pyod.models.ts_od import TimeSeriesOD >>> import numpy as np >>> X_train = np.random.randn(500) >>> clf = TimeSeriesOD(detector='IForest', window_size=20) >>> clf.fit(X_train) >>> scores = clf.decision_function(np.random.randn(200)) """ def __init__(self, detector='IForest', window_size=50, step=1, score_aggregation='max', contamination=0.1): super(TimeSeriesOD, self).__init__(contamination=contamination) self.detector = detector self.window_size = window_size self.step = step self.score_aggregation = score_aggregation def _get_min_length(self): """Return the minimum time series length required. Returns ------- min_length : int """ return self.window_size
[docs] def fit(self, X, y=None): """Fit detector on time series data. Validates the input, creates sliding windows, fits the inner detector on the window matrix, and maps scores back to timestamps using the masked-score workflow. Parameters ---------- X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels) Training time series data. y : Ignored Not used, present for API consistency. Returns ------- self : object Fitted estimator. """ X = validate_ts_input(X) n_timestamps = X.shape[0] min_len = self._get_min_length() if n_timestamps < min_len: raise ValueError( "Time series length %d is shorter than minimum " "required length %d (window_size=%d)" % (n_timestamps, min_len, self.window_size)) self._set_n_classes(y) # Resolve the inner detector self.detector_ = resolve_detector(self.detector, self.contamination) # Create sliding windows and fit inner detector windows = sliding_windows(X, self.window_size, self.step) self.detector_.fit(windows) window_scores = self.detector_.decision_scores_ # Map window scores back to timestamps scores, valid_mask = map_scores_to_timestamps( window_scores, self.window_size, self.step, n_timestamps, aggregation=self.score_aggregation) # Process on valid subset to compute threshold valid_scores = scores[valid_mask] self.decision_scores_ = valid_scores self._process_decision_scores() # Reconstruct full-length arrays full_scores = scores.copy() full_scores[~valid_mask] = self.threshold_ full_labels = (full_scores > self.threshold_).astype(int) self.decision_scores_ = full_scores self.labels_ = full_labels return self
[docs] def decision_function(self, X): """Predict raw anomaly scores for time series X. Parameters ---------- X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels) Test time series data. Returns ------- anomaly_scores : numpy array of shape (n_timestamps,) Anomaly scores. Higher is more abnormal. """ check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_']) X = validate_ts_input(X) n_timestamps = X.shape[0] # Create sliding windows and score with fitted detector windows = sliding_windows(X, self.window_size, self.step) window_scores = self.detector_.decision_function(windows) # Map back to timestamps scores, valid_mask = map_scores_to_timestamps( window_scores, self.window_size, self.step, n_timestamps, aggregation=self.score_aggregation) # Fill invalid positions with threshold scores[~valid_mask] = self.threshold_ return scores