Source code for pyod.utils.encoders

# -*- coding: utf-8 -*-
"""Encoder abstraction for EmbeddingOD.

Provides BaseEncoder and concrete implementations for converting
raw data (text, images) to numeric embeddings.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause

import abc
import importlib

import numpy as np
from sklearn.utils import check_array


[docs] class BaseEncoder(abc.ABC): """Abstract base class for embedding encoders. All encoders must implement the ``encode`` method, which converts raw input data to a 2D numpy array of shape (n_samples, n_features). """
[docs] @abc.abstractmethod def encode(self, X, batch_size=32, show_progress=True): """Convert raw input to numeric embeddings. Parameters ---------- X : list or array-like Raw input data. batch_size : int, optional (default=32) Batch size for encoding. show_progress : bool, optional (default=True) Whether to show a progress bar. Returns ------- embeddings : numpy array of shape (n_samples, n_features) """
def _validate_output(self, embeddings, n_samples=None): """Validate and normalize encoder output. Parameters ---------- embeddings : array-like Raw encoder output. n_samples : int or None Expected number of rows. If provided, raises ValueError on mismatch. """ embeddings = np.asarray(embeddings, dtype=np.float64) if embeddings.ndim == 1: embeddings = embeddings.reshape(-1, 1) check_array(embeddings) if n_samples is not None and embeddings.shape[0] != n_samples: raise ValueError( "Encoder returned %d samples, expected %d" % (embeddings.shape[0], n_samples)) return embeddings
[docs] class CallableEncoder(BaseEncoder): """Encoder that wraps a user-provided callable. Parameters ---------- fn : callable A function that accepts raw input and returns a numpy array of shape (n_samples, n_features). Examples -------- >>> import numpy as np >>> encoder = CallableEncoder(fn=lambda X: np.random.randn(len(X), 10)) >>> embeddings = encoder.encode(["hello", "world"]) >>> embeddings.shape (2, 10) """ def __init__(self, fn): if not callable(fn): raise TypeError("fn must be callable, got %s" % type(fn)) self.fn = fn
[docs] def encode(self, X, batch_size=32, show_progress=True): embeddings = self.fn(X) return self._validate_output(embeddings, n_samples=len(X))
[docs] class MultiModalEncoder(BaseEncoder): """Encode multiple modalities and concatenate into a single embedding. Each modality is encoded by its own encoder. The resulting embeddings are concatenated column-wise into a single feature matrix suitable for any PyOD detector. Parameters ---------- encoders : dict of {str: encoder} Maps modality name to encoder. Each value can be: - A string (resolved via resolve_encoder at encode time) - A BaseEncoder instance - ``'passthrough'`` for pre-computed numeric features weights : dict of {str: float} or None, optional (default=None) Per-modality scaling applied after encoding. Useful when embedding dimensions differ significantly across modalities. Examples -------- >>> from pyod.utils.encoders import MultiModalEncoder >>> encoder = MultiModalEncoder({ ... 'text': 'all-MiniLM-L6-v2', ... 'tabular': 'passthrough', ... }) >>> data = {'text': ["hello", "world"], 'tabular': np.array([[1, 2], [3, 4]])} >>> embeddings = encoder.encode(data) >>> embeddings.shape[0] 2 """ def __init__(self, encoders, weights=None): if not isinstance(encoders, dict) or len(encoders) == 0: raise ValueError("encoders must be a non-empty dict") self.encoders = encoders self.weights = weights
[docs] def fit_encode(self, X, batch_size=32, show_progress=True): """Encode training data and store per-modality mean embeddings. Call this during training (EmbeddingOD.fit) so that mean embeddings are available for imputing missing samples at test time. Subsequent calls to ``encode`` will use these stored means. Parameters ---------- X : dict of {str: data} Training data. Should not contain ``None`` samples. Returns ------- embeddings : numpy array of shape (n_samples, total_features) """ emb = self.encode(X, batch_size=batch_size, show_progress=show_progress) # Store per-modality means from training for imputation. # Use _last_parts_unweighted_ (before weights) so that # weights are applied exactly once during encode. self.means_ = {} for name, part in self._last_parts_unweighted_.items(): self.means_[name] = np.mean(part, axis=0) return emb
[docs] def encode(self, X, batch_size=32, show_progress=True): """Encode multi-modal input and concatenate. Parameters ---------- X : dict of {str: data} Maps modality name to input data. Keys must match the ``encoders`` dict. Individual samples may be ``None`` to indicate a missing modality for that sample; missing embeddings are imputed with the training mean (if ``fit_encode`` was called) or zeros. batch_size : int, optional (default=32) Batch size for encoding. show_progress : bool, optional (default=True) Show progress bar. Returns ------- embeddings : numpy array of shape (n_samples, total_features) """ if not isinstance(X, dict): raise TypeError( "MultiModalEncoder expects a dict input, got %s" % type(X)) # Resolve encoders on first call if not hasattr(self, 'resolved_'): self.resolved_ = {} for name, enc in self.encoders.items(): if enc == 'passthrough': self.resolved_[name] = 'passthrough' else: self.resolved_[name] = resolve_encoder(enc) parts = [] self._last_parts_ = {} self._last_parts_unweighted_ = {} n_samples = None for name, enc in self.resolved_.items(): if name not in X: raise KeyError( "Modality '%s' not found in input. " "Expected keys: %s" % (name, list(self.resolved_.keys()))) modality_data = X[name] has_missing = isinstance(modality_data, (list, tuple)) and \ any(v is None for v in modality_data) if has_missing: present_idx = [i for i, v in enumerate(modality_data) if v is not None] if len(present_idx) == 0: raise ValueError( "All samples are None for modality '%s'" % name) # Encode or passthrough the present samples if enc == 'passthrough': present_vals = [modality_data[i] for i in present_idx] first = np.asarray(present_vals[0], dtype=np.float64) n_feat = 1 if first.ndim == 0 else first.shape[0] present_emb = np.zeros((len(present_idx), n_feat), dtype=np.float64) for j, idx in enumerate(present_idx): present_emb[j] = np.asarray( modality_data[idx], dtype=np.float64) else: present_data = [modality_data[i] for i in present_idx] present_emb = enc.encode(present_data, batch_size=batch_size, show_progress=show_progress) # Impute missing with training mean or zeros n_total = len(modality_data) fill = self.means_.get(name, np.zeros(present_emb.shape[1])) \ if hasattr(self, 'means_') else np.zeros(present_emb.shape[1]) emb = np.tile(fill, (n_total, 1)) for j, idx in enumerate(present_idx): emb[idx] = present_emb[j] elif enc == 'passthrough': emb = np.asarray(modality_data, dtype=np.float64) if emb.ndim == 1: emb = emb.reshape(-1, 1) else: emb = enc.encode(modality_data, batch_size=batch_size, show_progress=show_progress) if n_samples is None: n_samples = emb.shape[0] elif emb.shape[0] != n_samples: raise ValueError( "Modality '%s' has %d samples, expected %d" % (name, emb.shape[0], n_samples)) self._last_parts_unweighted_[name] = emb.copy() if self.weights is not None and name in self.weights: emb = emb * self.weights[name] self._last_parts_[name] = emb parts.append(emb) return self._validate_output(np.hstack(parts), n_samples=n_samples)
# ---- Encoder registry and resolution ---- _ENCODER_REGISTRY = { # Sentence Transformers 'all-MiniLM-L6-v2': ('sentence_transformer', {'model_name': 'all-MiniLM-L6-v2'}), 'all-mpnet-base-v2': ('sentence_transformer', {'model_name': 'all-mpnet-base-v2'}), # OpenAI 'text-embedding-3-small': ('openai', {'model_name': 'text-embedding-3-small'}), 'text-embedding-3-large': ('openai', {'model_name': 'text-embedding-3-large'}), # HuggingFace Vision 'dinov2-small': ('huggingface', {'model_name': 'facebook/dinov2-small', 'modality': 'image'}), 'dinov2-base': ('huggingface', {'model_name': 'facebook/dinov2-base', 'modality': 'image'}), 'dinov2-large': ('huggingface', {'model_name': 'facebook/dinov2-large', 'modality': 'image'}), 'clip-vit-base': ('huggingface', {'model_name': 'openai/clip-vit-base-patch32', 'modality': 'image'}), # HuggingFace Text 'bert-base-uncased': ('huggingface', {'model_name': 'bert-base-uncased', 'modality': 'text'}), # Audio (handcrafted acoustic features; no GPU) 'audio-mfcc': ('audio', {}), } _ENCODER_BACKENDS = { 'sentence_transformer': ( 'pyod.utils.encoders.sentence_transformer', 'SentenceTransformerEncoder'), 'openai': ( 'pyod.utils.encoders.openai_encoder', 'OpenAIEncoder'), 'huggingface': ( 'pyod.utils.encoders.huggingface', 'HuggingFaceEncoder'), 'audio': ( 'pyod.utils.encoders.audio', 'AudioFeatureEncoder'), } _INSTALL_HINTS = { 'sentence_transformer': 'pip install sentence-transformers', 'openai': 'pip install openai', 'huggingface': 'pip install transformers torch', 'audio': 'pip install librosa soundfile', } def _create_encoder(backend, **kwargs): """Create an encoder from a backend name and kwargs.""" module_path, class_name = _ENCODER_BACKENDS[backend] try: mod = importlib.import_module(module_path) except ImportError: hint = _INSTALL_HINTS.get(backend, '') raise ImportError( "Encoder backend '%s' requires module '%s' which is not " "installed. Install with: %s" % (backend, module_path, hint)) cls = getattr(mod, class_name) return cls(**kwargs)
[docs] def resolve_encoder(encoder): """Resolve an encoder from various input types. Parameters ---------- encoder : str, BaseEncoder, or callable - If BaseEncoder instance, returned as-is. - If callable, wrapped in CallableEncoder. - If string, looked up in the encoder registry. If not found, tries sentence-transformers first, then HuggingFace AutoModel. The auto-resolve fallback is designed for text embedding models. For image models (DINOv2, CLIP, etc.), use registry shortcuts (e.g., 'dinov2-small', 'clip-vit-base') instead of raw HuggingFace model IDs. Returns ------- encoder : BaseEncoder """ if isinstance(encoder, BaseEncoder): return encoder if callable(encoder) and not isinstance(encoder, str): return CallableEncoder(encoder) if isinstance(encoder, str): # Check registry if encoder in _ENCODER_REGISTRY: backend, kwargs = _ENCODER_REGISTRY[encoder] return _create_encoder(backend, **kwargs) # Auto-resolve: try sentence-transformers first (most text # embedding models are compatible), then HuggingFace AutoModel try: return _create_encoder('sentence_transformer', model_name=encoder) except ImportError: pass try: return _create_encoder('huggingface', model_name=encoder, modality='text') except ImportError: pass raise ValueError( "Cannot resolve encoder '%s'. Provide a registry shortcut " "(e.g., 'all-MiniLM-L6-v2'), a HuggingFace model ID, a " "BaseEncoder instance, or a callable." % encoder) raise TypeError("encoder must be str, BaseEncoder, or callable, " "got %s" % type(encoder))