# -*- coding: utf-8 -*-
"""MatrixProfile: Time series anomaly detection using the STOMP algorithm.
Computes the Matrix Profile via the STOMP (Scalable Time-series Ordered
Matrix Profile) algorithm, which identifies anomalous subsequences by
measuring z-normalized Euclidean distance to the nearest non-trivial
match.
See :cite:`yeh2016matrix` for details.
Reference:
Yeh, C.C.M., Zhu, Y., Ulanova, L., Begum, N., Ding, Y., Dau, H.A.,
Silva, D.F., Mueen, A. and Keogh, E., 2016. Matrix profile I:
All pairs similarity joins for time series: a unifying view that
includes motifs, discords and shapelets. In ICDM, pp. 1317-1322.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
import numpy as np
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ._ts_utils import (validate_ts_input, map_scores_to_timestamps,
aggregate_channel_scores)
def _compute_matrix_profile(T, m):
"""Compute the Matrix Profile of a 1-D time series using STOMP.
Parameters
----------
T : np.ndarray of shape (n,)
Input time series (single channel).
m : int
Subsequence (window) length.
Returns
-------
mp : np.ndarray of shape (n - m + 1,)
Matrix Profile values (nearest-neighbor z-normalized distances).
"""
n = len(T)
n_subseq = n - m + 1
exclusion_zone = m // 4
# --- Precompute rolling mean and std using cumulative sums ---
cumsum = np.cumsum(T)
cumsum2 = np.cumsum(T ** 2)
# sum of T[i:i+m] for each subsequence i
subseq_sum = np.empty(n_subseq)
subseq_sum[0] = cumsum[m - 1]
subseq_sum[1:] = cumsum[m:] - cumsum[:n - m]
subseq_sum2 = np.empty(n_subseq)
subseq_sum2[0] = cumsum2[m - 1]
subseq_sum2[1:] = cumsum2[m:] - cumsum2[:n - m]
mu = subseq_sum / m
sigma_sq = subseq_sum2 / m - mu ** 2
sigma_sq = np.maximum(sigma_sq, 0.0) # numerical stability
sigma = np.sqrt(sigma_sq)
# Mask for constant subsequences (std < 1e-10)
const_mask = sigma < 1e-10
# Initialize Matrix Profile with infinity
mp = np.full(n_subseq, np.inf)
# --- First column (j=0): compute distance profile using MASS (FFT) ---
# Pad to next power of 2 for FFT efficiency
fft_size = 1
while fft_size < 2 * n:
fft_size *= 2
T_fft = np.fft.rfft(T, n=fft_size)
# First query subsequence (reversed, then padded)
query = T[:m][::-1]
Q_fft = np.fft.rfft(query, n=fft_size)
# QT[i] = dot product of T[i:i+m] and T[0:m]
QT_full = np.fft.irfft(T_fft * Q_fft, n=fft_size)
QT = QT_full[m - 1:m - 1 + n_subseq].copy()
# Compute distance for j=0
_update_mp(mp, QT, mu, sigma, const_mask, m, 0, exclusion_zone, n_subseq)
# Keep a copy of QT for incremental updates
QT_prev = QT.copy()
# --- STOMP: incremental updates for j=1..n_subseq-1 ---
for j in range(1, n_subseq):
# Incremental QT update:
# QT_new[i] = QT_old[i-1] - T[i-1]*T[j-1] + T[i+m-1]*T[j+m-1]
QT_new = np.empty(n_subseq)
# QT_new[0] must be computed as a direct dot product
QT_new[0] = np.dot(T[:m], T[j:j + m])
# Vectorized incremental update for i=1..n_subseq-1
i_indices = np.arange(1, n_subseq)
QT_new[1:] = (QT_prev[:-1]
- T[i_indices - 1] * T[j - 1]
+ T[i_indices + m - 1] * T[j + m - 1])
_update_mp(mp, QT_new, mu, sigma, const_mask, m, j,
exclusion_zone, n_subseq)
QT_prev = QT_new
return mp
def _update_mp(mp, QT, mu, sigma, const_mask, m, j, exclusion_zone, n_subseq):
"""Update the Matrix Profile for column j.
Converts QT values to z-normalized distances and updates the
profile wherever the new distance is smaller.
Parameters
----------
mp : np.ndarray, modified in-place
QT : np.ndarray of shape (n_subseq,)
mu : np.ndarray of shape (n_subseq,)
sigma : np.ndarray of shape (n_subseq,)
const_mask : np.ndarray of bool
m : int
j : int, current column index
exclusion_zone : int
n_subseq : int
"""
# Compute z-normalized distance:
# d = sqrt(2*m*(1 - (QT - m*mu_i*mu_j) / (m*sigma_i*sigma_j)))
# where i ranges over all subsequences
# Denominator: m * sigma_i * sigma_j
denom = m * sigma * sigma[j]
# Numerator inside the (1 - ...) term
numerator = QT - m * mu * mu[j]
# Compute the argument of sqrt
# Avoid division by zero: where denom is 0 (constant subsequences),
# distance is inf
with np.errstate(divide='ignore', invalid='ignore'):
corr = np.where(denom > 0, numerator / denom, 0.0)
dist_sq = 2 * m * (1 - corr)
# Clip for numerical stability
dist_sq = np.maximum(dist_sq, 0.0)
dist = np.sqrt(dist_sq)
# Set distance to inf for constant subsequences
dist[const_mask] = np.inf
if const_mask[j]:
dist[:] = np.inf
# Apply exclusion zone: ignore indices where |i - j| <= exclusion_zone
ez_start = max(0, j - exclusion_zone)
ez_end = min(n_subseq, j + exclusion_zone + 1)
dist[ez_start:ez_end] = np.inf
# Update Matrix Profile where distance is smaller
mask = dist < mp
mp[mask] = dist[mask]
[docs]
class MatrixProfile(BaseDetector):
"""Matrix Profile time series anomaly detector using STOMP.
Computes the Matrix Profile via the STOMP algorithm, identifying
anomalous subsequences by measuring z-normalized Euclidean distance
to the nearest non-trivial neighbor. Subsequences with high
Matrix Profile values (discords) are anomalous.
This is a **transductive** detector: it only scores the training
data. ``decision_function``, ``predict``, ``predict_proba``, and
``predict_confidence`` raise ``NotImplementedError`` when called
with new data.
Parameters
----------
window_size : int, optional (default=50)
Subsequence length for the Matrix Profile computation.
contamination : float in (0., 0.5), optional (default=0.1)
Expected proportion of outliers in the dataset.
channel_aggregation : str, optional (default='max')
How to aggregate per-channel scores for multivariate data.
One of 'max' or 'mean'.
Attributes
----------
decision_scores_ : numpy array of shape (n_timestamps,)
Outlier scores of the training data. Higher is more abnormal.
threshold_ : float
Score threshold based on ``contamination``.
labels_ : numpy array of shape (n_timestamps,)
Binary labels of training data (0: inlier, 1: outlier).
References
----------
Yeh, C.C.M., Zhu, Y., Ulanova, L., Begum, N., Ding, Y., Dau, H.A.,
Silva, D.F., Mueen, A. and Keogh, E., 2016. Matrix profile I:
All pairs similarity joins for time series. In ICDM, pp. 1317-1322.
Examples
--------
>>> from pyod.models.ts_matrix_profile import MatrixProfile
>>> import numpy as np
>>> X_train = np.random.randn(300)
>>> clf = MatrixProfile(window_size=20, contamination=0.1)
>>> clf.fit(X_train)
>>> scores = clf.decision_scores_
"""
def __init__(self, window_size=50, contamination=0.1,
channel_aggregation='max'):
super(MatrixProfile, self).__init__(contamination=contamination)
self.window_size = window_size
self.channel_aggregation = channel_aggregation
[docs]
def fit(self, X, y=None):
"""Fit the Matrix Profile detector on time series data.
Parameters
----------
X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels)
Training time series data.
y : Ignored
Not used, present for API consistency.
Returns
-------
self : object
Fitted estimator.
"""
X = validate_ts_input(X)
n_timestamps, n_channels = X.shape
m = self.window_size
if n_timestamps < m + 1:
raise ValueError(
"Time series length %d is too short for window_size=%d. "
"Need at least %d timestamps." % (n_timestamps, m, m + 1))
self._set_n_classes(y)
# Compute Matrix Profile per channel
per_channel_ts_scores = []
for ch in range(n_channels):
ts = X[:, ch]
mp = _compute_matrix_profile(ts, m)
# Map subsequence-level MP scores back to timestamps
# step=1 since MP computes all subsequences
scores, valid_mask = map_scores_to_timestamps(
mp, m, step=1, n_timestamps=n_timestamps,
aggregation=self.channel_aggregation)
per_channel_ts_scores.append((scores, valid_mask))
if n_channels == 1:
scores, valid_mask = per_channel_ts_scores[0]
else:
# Aggregate across channels
# First, fill NaN positions in each channel with 0 for aggregation
filled_scores = []
combined_valid = per_channel_ts_scores[0][1].copy()
for ch_scores, ch_valid in per_channel_ts_scores:
filled = ch_scores.copy()
filled[~ch_valid] = 0.0
filled_scores.append(filled)
combined_valid &= ch_valid
scores = aggregate_channel_scores(
filled_scores, method=self.channel_aggregation)
valid_mask = combined_valid
# Masked-score workflow: compute threshold on valid scores only
valid_scores = scores[valid_mask]
self.decision_scores_ = valid_scores
self._process_decision_scores()
# Reconstruct full-length arrays
full_scores = scores.copy()
full_scores[~valid_mask] = self.threshold_
full_labels = (full_scores > self.threshold_).astype(int)
self.decision_scores_ = full_scores
self.labels_ = full_labels
return self
[docs]
def decision_function(self, X):
"""Not supported (transductive detector).
Raises
------
NotImplementedError
"""
raise NotImplementedError(
"MatrixProfile is a transductive detector and does not support "
"decision_function on new data. Access decision_scores_ after "
"fit() for training scores.")
[docs]
def predict(self, X, return_confidence=False):
"""Not supported (transductive detector).
Raises
------
NotImplementedError
"""
raise NotImplementedError(
"MatrixProfile is a transductive detector and does not support "
"predict on new data. Access labels_ after fit() for training "
"labels.")
[docs]
def predict_proba(self, X, method='linear', return_confidence=False):
"""Not supported (transductive detector).
Raises
------
NotImplementedError
"""
raise NotImplementedError(
"MatrixProfile is a transductive detector and does not support "
"predict_proba on new data.")
[docs]
def predict_confidence(self, X):
"""Not supported (transductive detector).
Raises
------
NotImplementedError
"""
raise NotImplementedError(
"MatrixProfile is a transductive detector and does not support "
"predict_confidence on new data.")