# -*- coding: utf-8 -*-
"""AudioAE: a log-mel reconstruction autoencoder for audio anomaly detection.
Each clip is turned into overlapping log-mel context windows; a dense
autoencoder is fit on the windows of the (mostly normal) training clips,
and each clip is scored by its mean per-window reconstruction error. This
is the DCASE-style audio anomaly detection baseline, expressed through
PyOD's ``AutoEncoder`` so the training loop and preprocessing are shared
with the rest of the library.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
import numpy as np
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ..utils.encoders.audio import _to_mono_waveform
_DEFAULT_SR = 22050
def _logmel_windows(y, sr, n_mels, context, hop_length):
"""Return overlapping log-mel context windows for one waveform.
Output shape is ``(n_windows, n_mels * context)``. Clips shorter than
one context window are padded so at least one window is produced.
"""
import librosa
spec = librosa.power_to_db(
librosa.feature.melspectrogram(
y=y, sr=sr, n_mels=n_mels, hop_length=hop_length))
n_frames = spec.shape[1]
if n_frames < context:
pad = np.zeros((n_mels, context - n_frames), dtype=spec.dtype)
spec = np.concatenate([spec, pad], axis=1)
n_frames = context
windows = [spec[:, t:t + context].T.reshape(-1)
for t in range(n_frames - context + 1)]
return np.stack(windows).astype(np.float32)
[docs]
class AudioAE(BaseDetector):
"""Log-mel reconstruction autoencoder for audio anomaly detection.
The detector extracts overlapping log-mel context windows from each
clip, fits a dense autoencoder (PyOD's :class:`AutoEncoder`) on the
windows of the training clips, and scores each clip by its mean
per-window reconstruction error. Training assumes the input is mostly
normal, the usual unsupervised setting.
Requires ``torch`` (for the autoencoder) and ``pyod[audio]``
(``librosa``, ``soundfile``).
Parameters
----------
n_mels : int, optional (default=64)
Number of mel bands in the spectrogram.
context : int, optional (default=5)
Number of consecutive frames stacked into one autoencoder input
window. The window dimensionality is ``n_mels * context``.
hop_length : int, optional (default=512)
STFT hop length in samples.
sr : int, optional (default=22050)
Target sample rate. File inputs are loaded at this rate;
``(waveform, sample_rate)`` tuples are resampled to it.
contamination : float, optional (default=0.1)
Expected proportion of outliers, used for the clip-level
threshold and labels.
epoch_num : int, optional (default=40)
Autoencoder training epochs.
batch_size : int, optional (default=1024)
Autoencoder mini-batch size (over frames, not clips).
lr : float, optional (default=1e-3)
Learning rate.
hidden_neuron_list : list of int or None, optional (default=None)
Encoder hidden sizes. ``None`` uses ``[128, 32, 8]``, which gives
the DCASE-style 320-128-32-8 contraction for the default
320-dimensional window (``n_mels=64``, ``context=5``).
device : str or None, optional (default=None)
Torch device. ``None`` auto-selects.
random_state : int, optional (default=42)
Seed forwarded to the autoencoder.
verbose : int, optional (default=0)
Autoencoder verbosity.
Attributes
----------
decision_scores_ : numpy array of shape (n_clips,)
Clip-level outlier scores of the training data.
threshold_ : float
Score threshold based on ``contamination``.
labels_ : numpy array of shape (n_clips,)
Binary labels of training clips (0: inlier, 1: outlier).
ae_ : AutoEncoder
The fitted frame-level autoencoder.
Examples
--------
>>> import numpy as np
>>> from pyod.models.audio_ae import AudioAE
>>> clips = [np.random.RandomState(s).randn(22050) for s in range(20)]
>>> clf = AudioAE(epoch_num=5)
>>> clf.fit(clips) # doctest: +SKIP
>>> scores = clf.decision_function(clips) # doctest: +SKIP
"""
def __init__(self, n_mels=64, context=5, hop_length=512, sr=_DEFAULT_SR,
contamination=0.1, epoch_num=40, batch_size=1024, lr=1e-3,
hidden_neuron_list=None, device=None, random_state=42,
verbose=0):
super(AudioAE, self).__init__(contamination=contamination)
self.n_mels = n_mels
self.context = context
self.hop_length = hop_length
self.sr = sr
self.epoch_num = epoch_num
self.batch_size = batch_size
self.lr = lr
self.hidden_neuron_list = hidden_neuron_list
self.device = device
self.random_state = random_state
self.verbose = verbose
def _extract(self, X):
"""Return (frames, clip_idx) over all clips in X."""
try:
import librosa # noqa: F401
import soundfile # noqa: F401
except ImportError:
raise ImportError(
"AudioAE requires 'librosa' and 'soundfile'. "
"Install with: pip install pyod[audio]")
if len(X) == 0:
raise ValueError("AudioAE received an empty input.")
frames_list, clip_idx = [], []
for i, item in enumerate(X):
y = _to_mono_waveform(item, self.sr)
windows = _logmel_windows(y, self.sr, self.n_mels,
self.context, self.hop_length)
frames_list.append(windows)
clip_idx.append(np.full(len(windows), i, dtype=np.int64))
return np.concatenate(frames_list, axis=0), np.concatenate(clip_idx)
@staticmethod
def _aggregate(frame_scores, clip_idx, n_clips):
"""Mean per-frame score within each clip."""
out = np.zeros(n_clips, dtype=np.float64)
for i in range(n_clips):
mask = clip_idx == i
if mask.any():
out[i] = float(frame_scores[mask].mean())
return out
[docs]
def fit(self, X, y=None):
"""Fit the frame autoencoder and score the training clips.
Parameters
----------
X : list
Audio clips as file paths, waveform arrays, or
``(waveform, sample_rate)`` tuples.
y : Ignored
Not used, present for API consistency.
Returns
-------
self : object
"""
try:
import torch # noqa: F401
except ImportError:
raise ImportError(
"AudioAE requires torch (for the autoencoder) and "
"pyod[audio] (librosa, soundfile). Install with: "
"pip install pyod[torch,audio]")
from .auto_encoder import AutoEncoder
frames, clip_idx = self._extract(X)
dim = frames.shape[1]
hidden = self.hidden_neuron_list or [128, 32, 8]
# Drop hidden layers that are not smaller than the input so the
# autoencoder stays a contraction for unusually small windows.
hidden = [h for h in hidden if h < dim] or [max(dim // 2, 2)]
# Cap the batch size to the frame count. PyOD's AutoEncoder drops
# the last incomplete batch, so a batch larger than the dataset
# would drop every frame and leave the training loop with nothing.
batch_size = max(1, min(self.batch_size, frames.shape[0]))
self.ae_ = AutoEncoder(
contamination=self.contamination, epoch_num=self.epoch_num,
batch_size=batch_size, lr=self.lr,
hidden_neuron_list=hidden, device=self.device,
random_state=self.random_state, verbose=self.verbose)
self.ae_.fit(frames)
frame_scores = self.ae_.decision_function(frames)
self._set_n_classes(y)
self.decision_scores_ = self._aggregate(frame_scores, clip_idx, len(X))
self._process_decision_scores()
return self
[docs]
def decision_function(self, X):
"""Predict clip-level anomaly scores for X.
Parameters
----------
X : list
Audio clips in the same formats accepted by ``fit``.
Returns
-------
anomaly_scores : numpy array of shape (n_clips,)
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
frames, clip_idx = self._extract(X)
frame_scores = self.ae_.decision_function(frames)
return self._aggregate(frame_scores, clip_idx, len(X))