# -*- coding: utf-8 -*-
"""KShape: k-Shape clustering-based time series anomaly detection.
Adapts the k-Shape clustering algorithm (Paparrizos & Gravano, SIGMOD 2015)
for anomaly detection. Sliding-window subsequences are clustered using
shape-based distance (SBD), and each subsequence is scored by its SBD to
the nearest centroid. Subsequences far from all centroids are anomalous.
Reference:
Paparrizos, J. and Gravano, L., 2015. k-Shape: Efficient and accurate
clustering of time series. In *Proceedings of the 2015 ACM SIGMOD
International Conference on Management of Data* (pp. 1855-1870).
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
import numpy as np
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ._ts_utils import (validate_ts_input, sliding_windows,
map_scores_to_timestamps,
aggregate_channel_scores)
def _znormalize(x):
"""Z-normalize a vector. Returns zero vector if std is near zero."""
s = np.std(x)
if s < 1e-10:
return np.zeros_like(x)
return (x - np.mean(x)) / s
def _sbd(x, y):
"""Shape-based distance between two z-normalized sequences.
SBD(x, y) = 1 - max_w CC_w(x, y) / (||x|| * ||y||)
Parameters
----------
x, y : np.ndarray of shape (m,)
Z-normalized sequences of equal length.
Returns
-------
dist : float
Shape-based distance in [0, 2]. Lower is more similar.
shift : int
The optimal shift (lag) that maximizes cross-correlation.
"""
m = len(x)
fft_size = 2 * m - 1
# Pad to next power of 2 for FFT efficiency
fft_size_padded = 1
while fft_size_padded < fft_size:
fft_size_padded *= 2
# Cross-correlation via FFT
fx = np.fft.fft(x, fft_size_padded)
fy = np.fft.fft(y, fft_size_padded)
cc = np.real(np.fft.ifft(fx * np.conj(fy)))
# The meaningful cross-correlation values are at indices
# 0..m-1 and fft_size_padded-m+1..fft_size_padded-1
# Rearrange to get lags -(m-1)..0..+(m-1)
cc_full = np.concatenate([cc[-(m - 1):], cc[:m]])
norm = np.linalg.norm(x) * np.linalg.norm(y)
if norm < 1e-10:
return 2.0, 0
ncc = cc_full / norm
idx = np.argmax(ncc)
dist = 1.0 - ncc[idx]
# Clamp to [0, 2] for numerical stability
dist = max(0.0, min(2.0, dist))
shift = idx - (m - 1)
return dist, shift
def _shift_align(x, shift, m):
"""Shift-align x by the given lag, zero-padding where needed.
Parameters
----------
x : np.ndarray of shape (m,)
shift : int
Positive means x is shifted right (leading zeros).
m : int
Length of the output.
Returns
-------
aligned : np.ndarray of shape (m,)
"""
aligned = np.zeros(m)
if shift >= 0:
end = min(m, m - shift)
if end > 0:
aligned[shift:shift + end] = x[:end]
else:
start = -shift
end = min(m, m + shift)
if end > 0:
aligned[:end] = x[start:start + end]
return aligned
def _compute_centroid(members):
"""Compute the k-Shape centroid for a set of z-normalized, shift-aligned
cluster members.
The centroid maximizes total similarity. It is the eigenvector
corresponding to the smallest eigenvalue of M = I - Q/n, where
Q = X^T X and X has members as rows.
Parameters
----------
members : np.ndarray of shape (n_members, m)
Z-normalized and shift-aligned cluster members.
Returns
-------
centroid : np.ndarray of shape (m,)
Z-normalized centroid.
"""
n, m = members.shape
if n == 0:
return np.zeros(m)
if n == 1:
return _znormalize(members[0])
Q = members.T @ members # (m, m)
M = np.eye(m) - Q / n
# The centroid maximises the Rayleigh quotient for similarity,
# which corresponds to the *smallest* eigenvalue of M = I - Q/n.
eigenvalues, eigenvectors = np.linalg.eigh(M)
# eigh returns eigenvalues in ascending order; we want the smallest
centroid = eigenvectors[:, 0]
# Orient sign: pick the sign that maximizes aggregate similarity
# with cluster members (eigenvectors are defined up to sign)
if np.dot(members.mean(axis=0), centroid) < 0:
centroid = -centroid
return _znormalize(centroid)
def _kshape(subsequences, n_clusters, max_iter, random_state):
"""Run the k-Shape clustering algorithm.
Parameters
----------
subsequences : np.ndarray of shape (n_windows, window_size)
Z-normalized sliding-window subsequences.
n_clusters : int
Number of clusters.
max_iter : int
Maximum number of Lloyd's iterations.
random_state : np.random.RandomState
Random state for reproducible initialization.
Returns
-------
centroids : np.ndarray of shape (n_clusters, window_size)
Final cluster centroids (z-normalized).
labels : np.ndarray of shape (n_windows,)
Cluster assignments.
distances : np.ndarray of shape (n_windows,)
SBD to the nearest centroid.
"""
n_windows, m = subsequences.shape
# Initialize centroids by randomly selecting subsequences
indices = random_state.choice(n_windows, size=n_clusters, replace=False)
centroids = np.array([_znormalize(subsequences[i]) for i in indices])
labels = np.full(n_windows, -1, dtype=int)
distances = np.full(n_windows, np.inf)
for iteration in range(max_iter):
old_labels = labels.copy()
# --- Assignment step ---
for i in range(n_windows):
best_dist = np.inf
best_label = 0
for k in range(n_clusters):
d, _ = _sbd(subsequences[i], centroids[k])
if d < best_dist:
best_dist = d
best_label = k
labels[i] = best_label
distances[i] = best_dist
# Check convergence
if np.array_equal(labels, old_labels):
break
# --- Update step ---
for k in range(n_clusters):
members_idx = np.where(labels == k)[0]
if len(members_idx) == 0:
# Re-initialize empty cluster with a random subsequence
idx = random_state.randint(n_windows)
centroids[k] = _znormalize(subsequences[idx])
continue
# Shift-align each member to the current centroid
aligned = np.empty((len(members_idx), m))
for j, idx in enumerate(members_idx):
_, shift = _sbd(subsequences[idx], centroids[k])
aligned[j] = _znormalize(
_shift_align(subsequences[idx], shift, m))
centroids[k] = _compute_centroid(aligned)
# Final distance computation with converged centroids
for i in range(n_windows):
best_dist = np.inf
for k in range(n_clusters):
d, _ = _sbd(subsequences[i], centroids[k])
if d < best_dist:
best_dist = d
distances[i] = best_dist
return centroids, labels, distances
[docs]
class KShape(BaseDetector):
"""k-Shape clustering-based time series anomaly detector.
Extracts sliding-window subsequences, clusters them using the
k-Shape algorithm (shape-based distance + eigenvalue centroid
update), and scores each subsequence by its SBD to the nearest
centroid. Subsequences far from all centroids are considered
anomalous.
This is an **inductive** detector: after fitting on training data,
``decision_function`` can score new time series.
Parameters
----------
n_clusters : int, optional (default=3)
Number of clusters for k-Shape.
window_size : int, optional (default=50)
Size of the sliding window for extracting subsequences.
max_iter : int, optional (default=100)
Maximum number of Lloyd's iterations for k-Shape.
contamination : float in (0., 0.5), optional (default=0.1)
Expected proportion of outliers in the dataset.
channel_aggregation : str, optional (default='max')
How to aggregate per-channel scores for multivariate input.
One of ``'max'`` or ``'mean'``.
random_state : int or None, optional (default=42)
Random seed for reproducible centroid initialization.
Attributes
----------
decision_scores_ : numpy array of shape (n_timestamps,)
Outlier scores of the training data. Higher is more abnormal.
threshold_ : float
Score threshold based on ``contamination``.
labels_ : numpy array of shape (n_timestamps,)
Binary labels of training data (0: inlier, 1: outlier).
centroids_ : list of numpy arrays
Per-channel cluster centroids learned during fit.
Examples
--------
>>> from pyod.models.ts_kshape import KShape
>>> import numpy as np
>>> X_train = np.random.randn(300)
>>> clf = KShape(n_clusters=3, window_size=20, contamination=0.1)
>>> clf.fit(X_train)
>>> scores = clf.decision_function(np.random.randn(200))
References
----------
.. [1] Paparrizos, J. and Gravano, L., 2015. k-Shape: Efficient and
accurate clustering of time series. In *Proceedings of the 2015
ACM SIGMOD International Conference on Management of Data*
(pp. 1855-1870).
"""
def __init__(self, n_clusters=3, window_size=50, max_iter=100,
contamination=0.1, channel_aggregation='max',
random_state=42):
super(KShape, self).__init__(contamination=contamination)
self.n_clusters = n_clusters
self.window_size = window_size
self.max_iter = max_iter
self.channel_aggregation = channel_aggregation
self.random_state = random_state
def _fit_channel(self, ts):
"""Fit k-Shape on a single channel and return per-timestamp scores.
Parameters
----------
ts : np.ndarray of shape (n_timestamps,)
Single-channel time series.
Returns
-------
scores : np.ndarray of shape (n_timestamps,)
Per-timestamp anomaly scores.
valid_mask : np.ndarray of shape (n_timestamps,), dtype=bool
Mask indicating which timestamps have valid scores.
centroids : np.ndarray of shape (n_clusters, window_size)
Learned centroids.
"""
n_timestamps = len(ts)
# Extract sliding windows as a 2D array
X_2d = ts.reshape(-1, 1)
windows = sliding_windows(X_2d, self.window_size, step=1)
# Z-normalize each window
znorm_windows = np.array([_znormalize(w) for w in windows])
rng = np.random.RandomState(self.random_state)
centroids, _, distances = _kshape(
znorm_windows, self.n_clusters, self.max_iter, rng)
# Map window-level scores to timestamps
scores, valid_mask = map_scores_to_timestamps(
distances, self.window_size, step=1,
n_timestamps=n_timestamps, aggregation='max')
return scores, valid_mask, centroids
def _score_channel(self, ts, centroids):
"""Score a single channel against pre-learned centroids.
Parameters
----------
ts : np.ndarray of shape (n_timestamps,)
Single-channel time series.
centroids : np.ndarray of shape (n_clusters, window_size)
Learned centroids.
Returns
-------
scores : np.ndarray of shape (n_timestamps,)
Per-timestamp anomaly scores.
valid_mask : np.ndarray of shape (n_timestamps,), dtype=bool
"""
n_timestamps = len(ts)
X_2d = ts.reshape(-1, 1)
windows = sliding_windows(X_2d, self.window_size, step=1)
znorm_windows = np.array([_znormalize(w) for w in windows])
# Score each window by SBD to nearest centroid
n_windows = len(znorm_windows)
distances = np.empty(n_windows)
for i in range(n_windows):
best_dist = np.inf
for k in range(len(centroids)):
d, _ = _sbd(znorm_windows[i], centroids[k])
if d < best_dist:
best_dist = d
distances[i] = best_dist
scores, valid_mask = map_scores_to_timestamps(
distances, self.window_size, step=1,
n_timestamps=n_timestamps, aggregation='max')
return scores, valid_mask
[docs]
def fit(self, X, y=None):
"""Fit the k-Shape anomaly detector on time series data.
Parameters
----------
X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels)
Training time series data.
y : Ignored
Not used, present for API consistency.
Returns
-------
self : object
Fitted estimator.
"""
X = validate_ts_input(X)
n_timestamps, n_channels = X.shape
n_windows = n_timestamps - self.window_size + 1
if n_windows < 1:
raise ValueError(
"Time series length %d is too short for window_size=%d. "
"Need at least %d timestamps."
% (n_timestamps, self.window_size, self.window_size + 1))
if n_windows < self.n_clusters:
raise ValueError(
"Not enough subsequences (%d) for n_clusters=%d. "
"Need a longer series or fewer clusters."
% (n_windows, self.n_clusters))
self._set_n_classes(y)
# Fit k-Shape per channel
self.centroids_ = []
per_channel_results = []
for ch in range(n_channels):
scores, valid_mask, centroids = self._fit_channel(X[:, ch])
self.centroids_.append(centroids)
per_channel_results.append((scores, valid_mask))
if n_channels == 1:
scores, valid_mask = per_channel_results[0]
else:
# Aggregate across channels
filled_scores = []
combined_valid = per_channel_results[0][1].copy()
for ch_scores, ch_valid in per_channel_results:
filled = ch_scores.copy()
filled[~ch_valid] = 0.0
filled_scores.append(filled)
combined_valid &= ch_valid
scores = aggregate_channel_scores(
filled_scores, method=self.channel_aggregation)
valid_mask = combined_valid
# Masked-score workflow: compute threshold on valid scores only
valid_scores = scores[valid_mask]
self.decision_scores_ = valid_scores
self._process_decision_scores()
# Reconstruct full-length arrays
full_scores = scores.copy()
full_scores[~valid_mask] = self.threshold_
full_labels = (full_scores > self.threshold_).astype(int)
self.decision_scores_ = full_scores
self.labels_ = full_labels
return self
[docs]
def decision_function(self, X):
"""Predict raw anomaly scores for time series X.
Parameters
----------
X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels)
Test time series data.
Returns
-------
anomaly_scores : numpy array of shape (n_timestamps,)
Anomaly scores. Higher is more abnormal.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_',
'centroids_'])
X = validate_ts_input(X)
n_timestamps, n_channels = X.shape
if n_channels != len(self.centroids_):
raise ValueError(
"Number of channels in X (%d) does not match the number "
"of channels seen during fit (%d)."
% (n_channels, len(self.centroids_)))
per_channel_results = []
for ch in range(n_channels):
scores, valid_mask = self._score_channel(
X[:, ch], self.centroids_[ch])
per_channel_results.append((scores, valid_mask))
if n_channels == 1:
scores, valid_mask = per_channel_results[0]
else:
filled_scores = []
combined_valid = per_channel_results[0][1].copy()
for ch_scores, ch_valid in per_channel_results:
filled = ch_scores.copy()
filled[~ch_valid] = 0.0
filled_scores.append(filled)
combined_valid &= ch_valid
scores = aggregate_channel_scores(
filled_scores, method=self.channel_aggregation)
valid_mask = combined_valid
# Fill invalid positions with threshold
scores[~valid_mask] = self.threshold_
return scores