# -*- coding: utf-8 -*-
"""ADEngine: anomaly detection lifecycle engine.
Handles data profiling, detection planning, detector construction,
and knowledge queries. Works as a standalone Python API (no LLM
required) or as the backend for MCP/agent interfaces.
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING, Any
import numpy as np
from .knowledge import KnowledgeBase
from pyod.utils._quality_metrics import (
compute_consensus,
compute_feature_importance,
compute_quality,
feature_contributions,
label_metrics,
select_best_detector,
)
from pyod.utils._kb_router import (
evaluate_rules,
make_plan,
suggest_alternative,
)
from pyod.utils._detector_factory import (
build_detector_from_plan,
)
from pyod.utils._nl_feedback import (
adjust_contamination_down,
adjust_contamination_up,
apply_nl_feedback,
apply_structured_feedback,
)
if TYPE_CHECKING:
from pyod.utils.investigation import InvestigationState
logger = logging.getLogger(__name__)
[docs]
class ADEngine:
"""Anomaly detection lifecycle engine.
Parameters
----------
knowledge_dir : str or None
Path to knowledge base directory. If None, uses bundled.
random_state : int or None, optional
Random seed forwarded to every detector that declares an
explicit ``random_state`` parameter when the engine instantiates
it from a plan. Detectors without ``random_state`` in their
signature (e.g., ABOD, KNN, LOF, SOD) are deterministic by
construction (distance, angle, or density based, with no internal
sampling) and need no seed. With this set, the shallow-detector
pipeline is reproducible: a run-to-run audit of the shipped
shallow detectors found every one either honors the seed or is
deterministic by construction, with no nondeterministic cases.
Deep detectors additionally depend on framework-level seeding
(e.g., ``torch.manual_seed``). Set this to a fixed integer for
byte-identical flagged sets across re-runs on the same input.
"""
def __init__(self, knowledge_dir: str | None = None,
random_state: int | None = None) -> None:
self.kb = KnowledgeBase(knowledge_dir=knowledge_dir)
self.random_state = random_state
[docs]
def profile_data(self, X: Any, data_type: str | None = None) -> dict:
"""Profile the input data.
Parameters
----------
X : array-like, list, or dict
Input data.
data_type : str or None
Explicit override. One of 'tabular', 'text', 'image',
'audio', 'time_series', 'multimodal', 'graph'.
Returns
-------
profile : dict
"""
if data_type is not None:
detected_type = data_type
else:
detected_type = self._sniff_data_type(X)
profile = {'data_type': detected_type}
if detected_type == 'text':
profile['n_samples'] = len(X)
elif detected_type == 'image':
profile['n_samples'] = len(X)
elif detected_type == 'audio':
profile['n_samples'] = len(X)
elif detected_type == 'multimodal':
first_key = next(iter(X))
first_val = X[first_key]
profile['n_samples'] = len(first_val)
profile['modalities'] = list(X.keys())
elif detected_type == 'graph':
# PyG Data object (only supported graph input for ADEngine)
profile['n_nodes'] = X.num_nodes
profile['n_edges'] = X.edge_index.shape[1]
profile['n_features'] = (
X.x.shape[1] if X.x is not None else 0)
profile['has_features'] = X.x is not None
profile['n_samples'] = X.num_nodes
else:
# tabular or time_series
arr = np.asarray(X, dtype=np.float64)
if arr.ndim == 1:
arr = arr.reshape(-1, 1)
profile['n_samples'] = arr.shape[0]
profile['n_features'] = arr.shape[1]
profile['has_nan'] = bool(np.isnan(arr).any())
profile['dtype'] = str(arr.dtype)
n_feat = arr.shape[1]
if n_feat <= 10:
profile['dimensionality_class'] = 'low'
elif n_feat <= 100:
profile['dimensionality_class'] = 'medium'
else:
profile['dimensionality_class'] = 'high'
if detected_type == 'time_series':
profile['n_timestamps'] = arr.shape[0]
profile['channels'] = arr.shape[1]
return profile
def _sniff_data_type(self, X: Any) -> str:
"""Conservative data type detection."""
# Check for PyG Data object
try:
from torch_geometric.data import Data
if isinstance(X, Data):
return 'graph'
except ImportError:
pass
if isinstance(X, dict):
return 'multimodal'
if isinstance(X, (list, tuple)) and len(X) > 0:
sample = X[:min(20, len(X))]
if all(isinstance(x, str) for x in sample):
if self._looks_like_image_paths(sample[:5]):
return 'image'
if self._looks_like_audio_paths(sample[:5]):
return 'audio'
return 'text'
return 'tabular'
@staticmethod
def _looks_like_image_paths(samples: list[str]) -> bool:
"""Check if string samples look like image file paths."""
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.gif',
'.tiff', '.webp'}
for s in samples:
ext = os.path.splitext(s)[1].lower()
if ext not in image_exts:
return False
return True
@staticmethod
def _looks_like_audio_paths(samples: list[str]) -> bool:
"""Check if string samples look like audio file paths."""
audio_exts = {'.wav', '.flac', '.mp3', '.ogg', '.m4a',
'.aac', '.wma', '.aiff', '.aif'}
for s in samples:
ext = os.path.splitext(s)[1].lower()
if ext not in audio_exts:
return False
return True
def _with_contamination(self, detector_name: str,
params: dict) -> dict:
"""Ensure plan params expose an explicit contamination value (TA2).
The MCP `plan_detection` -> `build_detector` chain serializes the
plan to JSON. When `params` does not include `contamination`, the
emitted code snippet inherits the detector class's own default,
which is invisible to MCP-only agents. Always include a value
sourced from the KB `default_params` when the KB confirms the
detector accepts a `contamination` kwarg; otherwise leave params
unchanged so we do not paper over detectors that use a different
threshold mechanism.
"""
if 'contamination' in params:
return dict(params)
algo = self.kb.get_algorithm(detector_name)
if algo is None:
return dict(params)
kb_default = algo.get('default_params', {}).get('contamination')
if kb_default is None:
return dict(params)
out = dict(params)
out['contamination'] = kb_default
return out
[docs]
def plan_detection(self, profile: dict, priority: str = 'balanced',
constraints: dict | None = None, *,
top_k: int = 3,
llm_client=None,
llm_strict: bool | None = None) -> dict:
"""Plan a detection pipeline.
Parameters
----------
profile : dict
Output of profile_data().
priority : str
'speed', 'accuracy', or 'balanced'.
constraints : dict or None
Optional: {'exclude_detectors': [...]}
top_k : int, default 3
Number of detectors in the returned plan (primary + ``top_k - 1``
alternatives). Default ``3`` preserves the v3.5.2 behaviour
(``valid[1:3]`` produced two alternatives plus the primary).
Values < 1 are clamped to 1.
llm_client : callable or None, default None
Optional ``(prompt: str) -> str`` callable (see
:class:`pyod.utils._llm.LLMCallable`). When provided, routing
consults the LLM with the KB context and parses its response
into a plan via :func:`pyod.utils._llm.parse_routing_response`.
If the LLM call or parser raises, falls back to rule routing
with a :class:`RuntimeWarning` (see ``llm_strict``). When
``None`` (default), v3.5.2 rule routing is unchanged.
llm_strict : bool or None, default None
Per-call control for LLM-routing failure mode. ``True``
re-raises any exception from ``llm_client`` or the response
parser; ``False`` falls back to rule routing with a
:class:`RuntimeWarning`; ``None`` defers to the
``PYOD3_LLM_STRICT`` environment variable
(``"1"`` re-raises, anything else falls back). The explicit
kwarg takes precedence so concurrent callers in the same
process can choose independently.
Returns
-------
plan : dict (DetectionPlan, closed schema)
"""
constraints = constraints or {}
top_k = max(1, int(top_k))
if llm_client is not None:
try:
return self._plan_via_llm(profile, top_k, llm_client,
constraints)
except Exception as ex: # noqa: BLE001
if llm_strict is None:
import os
strict = os.environ.get('PYOD3_LLM_STRICT') == '1'
else:
strict = bool(llm_strict)
if strict:
raise
import warnings
warnings.warn(
f"plan_detection: llm_client routing failed "
f"({type(ex).__name__}: {ex}); falling back to "
"rule routing. Pass llm_strict=True (or set "
"PYOD3_LLM_STRICT=1) to re-raise.",
RuntimeWarning, stacklevel=2)
exclude = set(constraints.get('exclude_detectors', []))
matched = evaluate_rules(profile, priority, self.kb)
valid = []
for rec in matched:
name = rec['detector']
algo = self.kb.get_algorithm(name)
if algo is None:
continue
if algo.get('status') != 'shipped':
continue
if name in exclude:
continue
valid.append(rec)
if not valid:
# Fallback: pick first non-excluded shipped detector
fallback_order = ['IForest', 'ECOD', 'KNN', 'HBOS', 'LOF',
'COPOD', 'PCA']
fallback_name = None
for fb in fallback_order:
if fb not in exclude:
algo = self.kb.get_algorithm(fb)
if algo and algo.get('status') == 'shipped':
fallback_name = fb
break
if fallback_name is None:
return make_plan(
detector_name='',
params={},
reason='No valid detector available: all candidates '
'excluded or no matching rule found',
evidence=[],
confidence=0.0,
alternatives=[],
note='no_valid_plan')
return make_plan(
detector_name=fallback_name,
params=self._with_contamination(fallback_name, {}),
reason='Fallback: no routing rule matched or all '
'candidates excluded',
evidence=['ADBench'], confidence=0.5,
alternatives=[], note='No specific rule matched')
best = valid[0]
alternatives = [make_plan(
detector_name=r['detector'],
params=self._with_contamination(
r['detector'], r.get('params', {})),
preset=r.get('preset'),
reason=r.get('_reason', ''),
evidence=r.get('_evidence', []),
confidence=r.get('confidence', 0.5),
alternatives=[]) for r in valid[1:top_k]]
return make_plan(
detector_name=best['detector'],
params=self._with_contamination(
best['detector'], best.get('params', {})),
preset=best.get('preset'),
reason=best.get('_reason', ''),
evidence=best.get('_evidence', []),
confidence=best.get('confidence', 0.7),
alternatives=alternatives)
# ------------------------------------------------------------------
# Surface 1: KB exposure for caller-driven (agent / LLM) routing
# ------------------------------------------------------------------
[docs]
def get_kb_for_routing(self, profile: dict, top_k: int = 3,
constraints: dict | None = None) -> dict:
"""Return a structured KB snapshot for caller-driven detector
selection.
This is the agent-facing companion to :meth:`plan_detection`.
``plan_detection`` consumes the KB through hand-coded rules and
returns a single plan; ``get_kb_for_routing`` exposes the KB
directly so a caller (LLM agent, MCP tool client, ...) can
reason over each detector's strengths, weaknesses, complexity,
and benchmark rank, then call :meth:`make_plan` to commit a
plan.
Parameters
----------
profile : dict
Output of :meth:`profile_data`. Must include ``data_type``;
``n_samples`` / ``n_features`` are passed through unchanged.
top_k : int, default 3
The number of detectors the caller intends to select. The KB
snapshot itself is returned in full (filtered + sorted); the
field is included in the returned dict so the response-format
hint can reference it.
constraints : dict or None, optional
``{'exclude_detectors': list[str], 'data_type_strict': bool}``.
``exclude_detectors`` is a hard filter. ``data_type_strict``
(default ``True``) drops detectors whose KB ``data_types``
field does not include ``profile['data_type']``.
Returns
-------
dict
``{'task_profile': {...}, 'available_detectors': [...],
'top_k_requested': int, 'response_format_hint': str,
'n_available': int}``.
Notes
-----
Pure function; no LLM calls, no state mutation.
"""
if not isinstance(profile, dict):
raise ValueError("profile must be a dict from profile_data()")
top_k = max(1, int(top_k))
constraints = constraints or {}
exclude = set(constraints.get('exclude_detectors') or [])
data_type_strict = constraints.get('data_type_strict', True)
target_modality = profile.get('data_type', 'tabular')
catalog = self.list_detectors(data_type=None, status='shipped')
available: list[dict] = []
for entry in catalog:
name = entry.get('name') if isinstance(entry, dict) else str(entry)
if name in exclude:
continue
dts = entry.get('data_types') or []
modality_match = (target_modality in dts) if dts else True
if data_type_strict and not modality_match:
continue
complexity = entry.get('complexity') or {}
available.append({
'name': name,
'category': entry.get('category', 'unknown'),
'complexity_time': complexity.get('time'),
'complexity_space': complexity.get('space'),
'strengths': entry.get('strengths') or [],
'weaknesses': entry.get('weaknesses') or [],
'best_for': entry.get('best_for'),
'avoid_when': entry.get('avoid_when'),
'benchmark_rank': entry.get('benchmark_rank') or {},
'modality_match': modality_match,
})
# Modality-aware benchmark-rank keys. Each modality lists its
# preferred KB rank fields in priority order; the first non-None
# value sets the sort key. `ADBench_overall` is the universal
# fallback because the KB ships rank for nearly every tabular
# detector there. Detectors missing every key sort last (999).
_MODALITY_RANK_KEYS = {
'tabular': ['ADBench_overall'],
'time_series': ['TSB_AD_overall', 'TSB_AD_overall_iforest',
'ADBench_overall'],
'timeseries': ['TSB_AD_overall', 'TSB_AD_overall_iforest',
'ADBench_overall'],
'graph': ['BOND_deep', 'BOND_overall', 'ADBench_overall'],
'text': ['NLP_ADBench_overall', 'ADBench_overall'],
'image': ['MVTec_overall', 'ADBench_overall'],
'synthetic': ['ADBench_overall'],
}
rank_key_candidates = _MODALITY_RANK_KEYS.get(
str(target_modality).lower(),
[f"{str(target_modality).title()}_overall", 'ADBench_overall'])
def _rank(d):
br = d.get('benchmark_rank') or {}
for k in rank_key_candidates:
v = br.get(k)
if v is not None:
return v
return 999
# Stamp the resolved (rank, rank_key) on each entry so downstream
# consumers (e.g., build_routing_prompt) can render the modality-
# specific rank without re-doing the lookup. None when no rank
# field is present in the KB for this detector under this modality.
for d in available:
br = d.get('benchmark_rank') or {}
resolved = None
resolved_key = None
for k in rank_key_candidates:
v = br.get(k)
if v is not None:
resolved = v
resolved_key = k
break
d['resolved_rank'] = resolved
d['resolved_rank_key'] = resolved_key
available.sort(key=lambda d: (_rank(d), d['name']))
# Strip non-JSON-safe fields from the profile copy
profile_safe = {k: v for k, v in profile.items() if k != 'data'}
return {
'task_profile': profile_safe,
'available_detectors': available,
'top_k_requested': top_k,
'response_format_hint': (
"To commit your selection, call ADEngine.make_plan with "
"detector_choices=['detName1', ...] (ordered list of "
f"top-{top_k} names from available_detectors[*].name; "
"case-sensitive) and justifications=['why1', ...] "
"(parallel list, one short sentence each)."
),
'n_available': len(available),
}
[docs]
def make_plan(self, detector_choices: list,
justifications: list | None = None,
params: list | None = None) -> dict:
"""Commit a caller-driven detector plan and return a DetectionPlan.
Companion to :meth:`get_kb_for_routing`. The caller (LLM agent,
rule engine, human script) selects ``len(detector_choices)``
detectors and this method validates names against the KB, fills
per-detector defaults, and packages the result as a
:func:`pyod.utils._kb_router.make_plan`-shaped dict so existing
consumers (``build_detector``, ``run``, downstream MCP clients)
keep working unchanged.
Parameters
----------
detector_choices : list of str
Ordered list of detector class names. ``detector_choices[0]``
is the primary; the rest become ``alternatives`` in plan
order. Length must be >= 1. Names must match KB entries
(case-sensitive) with ``status='shipped'``; otherwise
``ValueError`` is raised.
justifications : list of str, optional
Parallel to ``detector_choices``. One short sentence per
choice. ``None`` is accepted and yields autogenerated
reasons.
params : list of dict, optional
Parallel to ``detector_choices``. Per-detector constructor
kwargs. ``None`` -> KB defaults overlaid with the
engine's contamination resolution.
Returns
-------
dict
Closed-schema DetectionPlan: ``{'detector_name',
'params', 'reason', 'evidence', 'confidence',
'alternatives', 'note'}``.
Raises
------
ValueError
If ``detector_choices`` is empty or any name is unknown /
not ``status='shipped'`` in the KB.
"""
if not detector_choices:
raise ValueError(
"detector_choices must be non-empty; got an empty list")
if not isinstance(detector_choices, list):
raise ValueError(
"detector_choices must be a list of strings; "
f"got {type(detector_choices).__name__}")
justifications = list(justifications or [])
params_list = list(params or [])
while len(justifications) < len(detector_choices):
justifications.append('')
while len(params_list) < len(detector_choices):
params_list.append({})
unknown = []
not_shipped = []
for name in detector_choices:
algo = self.kb.get_algorithm(name)
if algo is None:
unknown.append(name)
continue
if algo.get('status') != 'shipped':
not_shipped.append(name)
if unknown:
raise ValueError(
"Unknown detector name(s) (case-sensitive). Names must "
"match KB entries from ADEngine.list_detectors(): "
f"{unknown!r}")
if not_shipped:
raise ValueError(
f"Detector(s) not shipped (cannot be built): {not_shipped!r}")
primary = detector_choices[0]
primary_params = self._with_contamination(
primary, params_list[0] or {})
alternatives = []
for i, det in enumerate(detector_choices[1:], start=1):
alt_params = self._with_contamination(det, params_list[i] or {})
alt_reason = (justifications[i] or
'caller-selected via make_plan')
alternatives.append(make_plan(
detector_name=det,
params=alt_params,
reason=alt_reason,
evidence=['caller_selection'],
confidence=0.5,
alternatives=[]))
primary_reason = (justifications[0] or
'caller-selected via make_plan')
return make_plan(
detector_name=primary,
params=primary_params,
reason=primary_reason,
evidence=['caller_selection'],
confidence=0.7,
alternatives=alternatives,
note='caller-driven via make_plan')
def _plan_via_llm(self, profile: dict, top_k: int, llm_client,
constraints: dict | None = None) -> dict:
"""Route via an LLM client (internal; see plan_detection)."""
from ._llm import (
RoutingParseError,
build_routing_prompt,
parse_routing_response,
)
kb_context = self.get_kb_for_routing(
profile, top_k=top_k, constraints=constraints or {})
prompt = build_routing_prompt(kb_context, top_k=top_k)
response = llm_client(prompt)
detector_choices, justifications = parse_routing_response(
response, self.kb, top_k=top_k)
# LLM output is untrusted: enforce the constrained KB context
# (exclude_detectors + data_type_strict) after parsing. Without
# this, a hostile or buggy client could return an excluded or
# modality-mismatched detector and get an LLM-sourced plan.
# parse_routing_response only validates against the global KB.
allowed = {d['name'] for d in kb_context.get(
'available_detectors', [])}
blocked = [name for name in detector_choices if name not in allowed]
if blocked:
raise RoutingParseError(
"LLM selected detector(s) outside the constrained KB "
f"context: {blocked!r}. The constrained context "
f"excluded {sorted(constraints.get('exclude_detectors') or [])!r}.")
plan = self.make_plan(
detector_choices=detector_choices,
justifications=justifications)
# Tag the plan so downstream code can distinguish LLM-sourced
# plans from caller-driven or rule-driven ones.
plan['note'] = 'llm-driven via plan_detection(llm_client=...)'
plan['evidence'] = ['llm_routing']
return plan
# ------------------------------------------------------------------
# Detector construction
# ------------------------------------------------------------------
[docs]
def build_detector(self, plan: dict) -> Any:
"""Build and return an unfitted detector from a plan.
Parameters
----------
plan : dict (DetectionPlan)
Output of plan_detection().
Returns
-------
detector : BaseDetector
"""
return build_detector_from_plan(plan, self.kb,
random_state=self.random_state)
# ------------------------------------------------------------------
# One-shot detection
# ------------------------------------------------------------------
[docs]
def detect(self, X_train: Any, X_test: Any = None,
data_type: str | None = None,
priority: str = 'balanced') -> dict:
"""One-shot anomaly detection: profile -> plan -> run -> analyze.
Parameters
----------
X_train : array-like
Training data.
X_test : array-like or None
Optional test data.
data_type : str or None
Explicit data type override.
priority : str
'speed', 'accuracy', or 'balanced'.
Returns
-------
result : dict
Output of run_detection() enriched with analysis.
Compatible with all Tier B methods (analyze_results,
explain_findings, suggest_next_step, generate_report).
"""
profile = self.profile_data(X_train, data_type=data_type)
plan = self.plan_detection(profile, priority=priority)
result = self.run_detection(X_train, plan, X_test=X_test)
result['analysis'] = self.analyze_results(result, X=X_train)
return result
# ------------------------------------------------------------------
# Structured detection
# ------------------------------------------------------------------
[docs]
def run_detection(self, X_train: Any, plan: dict,
X_test: Any = None) -> dict:
"""Execute a detection plan.
Parameters
----------
X_train : array-like
Training data.
plan : dict (DetectionPlan)
Output of plan_detection().
X_test : array-like or None
Optional test data.
Returns
-------
result : dict
Keys: 'plan', 'scores_train', 'labels_train', 'threshold',
'n_anomalies', 'anomaly_ratio', 'detector', 'runtime_seconds',
'score_summary'. If X_test: also 'scores_test', 'labels_test'.
"""
import time
start = time.time()
clf = self.build_detector(plan)
clf.fit(X_train)
elapsed = time.time() - start
scores = clf.decision_scores_
labels = clf.labels_
n_anomalies = int(labels.sum())
result = {
'plan': plan,
'scores_train': scores,
'labels_train': labels,
'threshold': float(clf.threshold_),
'n_anomalies': n_anomalies,
'anomaly_ratio': n_anomalies / len(labels),
'detector': clf,
'runtime_seconds': elapsed,
'score_summary': {
'mean': float(np.mean(scores)),
'std': float(np.std(scores)),
'min': float(np.min(scores)),
'max': float(np.max(scores)),
'q25': float(np.percentile(scores, 25)),
'q75': float(np.percentile(scores, 75)),
},
}
if X_test is not None:
try:
result['scores_test'] = clf.decision_function(X_test)
result['labels_test'] = clf.predict(X_test)
except NotImplementedError:
result['scores_test'] = None
result['labels_test'] = None
return result
# ------------------------------------------------------------------
# Result analysis
# ------------------------------------------------------------------
[docs]
def analyze_results(self, result: dict, X: Any = None,
top_k: int = 10) -> dict:
"""Analyze detection results.
Parameters
----------
result : dict
Output of run_detection().
X : array-like or None
Original training data for feature-level analysis.
top_k : int
Number of top anomalies to return.
Returns
-------
analysis : dict
"""
top_k = max(0, int(top_k))
scores = result['scores_train']
labels = result['labels_train']
n_anomalies = int(labels.sum())
top_indices = np.argsort(scores)[::-1][:top_k]
top_anomalies = [{'index': int(i), 'score': float(scores[i])}
for i in top_indices]
score_dist = {
'mean': float(np.mean(scores)),
'std': float(np.std(scores)),
'min': float(np.min(scores)),
'max': float(np.max(scores)),
'median': float(np.median(scores)),
'q25': float(np.percentile(scores, 25)),
'q75': float(np.percentile(scores, 75)),
}
detector_name = result['plan'].get('detector_name', 'unknown')
ratio = n_anomalies / len(labels) if len(labels) > 0 else 0
summary = (
"%d anomalies detected out of %d samples (%.1f%%) "
"using %s. Scores range from %.4f to %.4f "
"(mean=%.4f, std=%.4f). Threshold: %.4f."
% (n_anomalies, len(labels), ratio * 100,
detector_name,
score_dist['min'], score_dist['max'],
score_dist['mean'], score_dist['std'],
result['threshold']))
analysis = {
'n_anomalies': n_anomalies,
'anomaly_ratio': ratio,
'score_distribution': score_dist,
'top_anomalies': top_anomalies,
'summary': summary,
}
if X is not None:
fi = compute_feature_importance(result, X)
if fi is not None:
analysis['feature_importance'] = fi
return analysis
# ------------------------------------------------------------------
# Explanation
# ------------------------------------------------------------------
[docs]
def explain_findings(self, result: dict,
indices: list[int] | None = None,
top_k: int = 5, X: Any = None,
feature_names: list[str] | None = None
) -> list[dict]:
"""Explain why specific samples were flagged as anomalies.
Parameters
----------
result : dict
Output of run_detection().
indices : list of int or None
Specific sample indices. If None, explains top-k.
top_k : int
Number of top anomalies to explain if indices is None.
X : array-like or None
Original data for feature-level explanations.
feature_names : list of str or None
Optional feature labels in column order, threaded through
to ``feature_contributions`` so each contributing feature
has a human-readable name. When omitted, names default to
``f'feature_{column_index}'``.
Returns
-------
explanations : list of dict
Each entry has ``'index'``, ``'score'``, ``'percentile'``,
``'label'``, ``'narrative'``. When ``X`` is provided, also
includes ``'contributing_features'``: a list of dicts with
``'feature'``, ``'name'``, ``'value'``, ``'mean'``,
``'z_score'``, and ``'direction'``.
"""
top_k = max(0, int(top_k))
scores = result['scores_train']
if indices is None:
indices = list(np.argsort(scores)[::-1][:top_k])
# Validate indices: must be integers (not bool) and in range
n_samples = len(scores)
validated = []
for idx in indices:
if isinstance(idx, bool):
continue
if not isinstance(idx, (int, np.integer)):
continue
if 0 <= idx < n_samples:
validated.append(int(idx))
indices = validated
explanations = []
for idx in indices:
score = float(scores[idx])
pctile = float(np.mean(scores <= score) * 100)
label = 'anomaly' if score > result['threshold'] else 'normal'
narrative = (
"Sample %d has anomaly score %.4f (percentile: %.1f%%), "
"classified as %s (threshold: %.4f)."
% (idx, score, pctile, label, result['threshold']))
entry = {
'index': int(idx),
'score': score,
'percentile': pctile,
'label': label,
'narrative': narrative,
}
if X is not None:
contribs = feature_contributions(
X, idx, scores, feature_names=feature_names)
if contribs is not None:
entry['contributing_features'] = contribs
explanations.append(entry)
return explanations
# ------------------------------------------------------------------
# Next-step suggestions
# ------------------------------------------------------------------
[docs]
def suggest_next_step(self, result: dict, analysis: dict,
feedback: str | None = None) -> dict:
"""Suggest what to try next.
Parameters
----------
result : dict
Output of run_detection().
analysis : dict
Output of analyze_results().
feedback : str or None
User feedback like 'too many false positives'.
Returns
-------
suggestion : dict
Keys: 'action', 'reason', optionally 'new_plan',
'threshold_adjustment'.
"""
feedback_lower = (feedback or '').lower()
ratio = analysis.get('anomaly_ratio', 0)
# Specific intents first (before generic keyword matches)
if 'ensemble' in feedback_lower:
return {
'action': 'try_alternative',
'reason': 'Consider running multiple detectors and '
'combining scores.',
'new_plan': suggest_alternative(result, self.kb, make_plan),
}
# "more sensitive" intent: lower threshold / increase contamination
_more_sensitive = (
'false negative' in feedback_lower
or 'missed' in feedback_lower
or 'lower threshold' in feedback_lower
or 'decrease threshold' in feedback_lower
or 'increase contamination' in feedback_lower
or 'higher contamination' in feedback_lower
)
if _more_sensitive:
current_contam = result['plan'].get('params', {}).get(
'contamination', 0.1)
new_contam = adjust_contamination_up(current_contam)
return {
'action': 'adjust_threshold',
'reason': 'Missed anomalies reported. Try increasing '
'contamination from %.2f to %.2f.'
% (current_contam, new_contam),
'threshold_adjustment': {
'current_contamination': current_contam,
'suggested_contamination': new_contam,
'direction': 'increase',
},
}
# "less sensitive" intent: raise threshold / decrease contamination
_less_sensitive = (
'false positive' in feedback_lower
or 'too many' in feedback_lower
or 'raise threshold' in feedback_lower
or 'increase threshold' in feedback_lower
or 'reduce contamination' in feedback_lower
or 'decrease contamination' in feedback_lower
or 'lower contamination' in feedback_lower
)
if _less_sensitive:
current_contam = result['plan'].get('params', {}).get(
'contamination', 0.1)
new_contam = adjust_contamination_down(current_contam)
return {
'action': 'adjust_threshold',
'reason': 'High false positive rate reported. Try reducing '
'contamination from %.2f to %.2f.'
% (current_contam, new_contam),
'threshold_adjustment': {
'current_contamination': current_contam,
'suggested_contamination': new_contam,
'direction': 'decrease',
},
}
if ('different' in feedback_lower or 'another' in feedback_lower
or 'switch' in feedback_lower):
new_plan = suggest_alternative(result, self.kb, make_plan)
return {
'action': 'try_alternative',
'reason': 'Trying an alternative detector.',
'new_plan': new_plan,
}
# No feedback: heuristic based on results
if ratio > 0.3:
current_contam = result['plan'].get('params', {}).get(
'contamination', 0.1)
new_contam = adjust_contamination_down(current_contam)
return {
'action': 'adjust_threshold',
'reason': '%.0f%% flagged as anomalies, which is unusually '
'high. Consider reducing contamination to %.2f.'
% (ratio * 100, new_contam),
'threshold_adjustment': {
'current_contamination': current_contam,
'suggested_contamination': new_contam,
'direction': 'decrease',
},
}
if ratio == 0:
new_plan = suggest_alternative(result, self.kb, make_plan)
return {
'action': 'try_alternative',
'reason': 'No anomalies detected. Try a different detector.',
'new_plan': new_plan,
}
return {
'action': 'done',
'reason': 'Results look reasonable (%.1f%% anomaly rate). '
'Review the top anomalies to validate.'
% (ratio * 100),
}
# ------------------------------------------------------------------
# Report generation
# ------------------------------------------------------------------
[docs]
def generate_report(self, result: dict, analysis: dict,
format: str = 'text') -> str:
"""Generate a summary report.
Parameters
----------
result : dict
Output of run_detection().
analysis : dict
Output of analyze_results().
format : str
'text' (markdown) or 'json'.
Returns
-------
report : str
"""
import json as json_mod
if format == 'json':
report_dict = {
'detector': result['plan'].get('detector_name', ''),
'reason': result['plan'].get('reason', ''),
'n_samples': len(result['scores_train']),
'n_anomalies': analysis['n_anomalies'],
'anomaly_ratio': analysis['anomaly_ratio'],
'threshold': result['threshold'],
'runtime_seconds': result.get('runtime_seconds', 0),
'score_distribution': analysis['score_distribution'],
'top_anomalies': analysis['top_anomalies'][:10],
}
return json_mod.dumps(report_dict, indent=2, default=str)
if format == 'text':
lines = []
lines.append('# Anomaly Detection Report')
lines.append('')
det = result['plan'].get('detector_name', 'unknown')
lines.append('## Configuration')
lines.append('- **Detector:** %s' % det)
lines.append('- **Reason:** %s' % result['plan'].get('reason', ''))
lines.append('- **Samples:** %d' % len(result['scores_train']))
lines.append('- **Runtime:** %.2fs'
% result.get('runtime_seconds', 0))
lines.append('')
lines.append('## Results')
lines.append('- **Anomalies found:** %d (%.1f%%)'
% (analysis['n_anomalies'],
analysis['anomaly_ratio'] * 100))
lines.append('- **Threshold:** %.4f' % result['threshold'])
dist = analysis['score_distribution']
lines.append('- **Score range:** %.4f to %.4f'
% (dist['min'], dist['max']))
lines.append('- **Score mean/std:** %.4f / %.4f'
% (dist['mean'], dist['std']))
lines.append('')
lines.append('## Top Anomalies')
lines.append('')
lines.append('| Rank | Index | Score |')
lines.append('|------|-------|-------|')
for rank, entry in enumerate(analysis['top_anomalies'][:10], 1):
lines.append('| %d | %d | %.4f |'
% (rank, entry['index'], entry['score']))
lines.append('')
return '\n'.join(lines)
raise ValueError("Unknown report format: '%s'. "
"Use 'text' or 'json'." % format)
# ------------------------------------------------------------------
# V3 Session workflow
# ------------------------------------------------------------------
[docs]
def start(self, X: Any,
data_type: str | None = None) -> InvestigationState:
"""Start an investigation session.
Profiles the data and returns an InvestigationState.
Parameters
----------
X : array-like, Data, list, or dict
Input data (any modality).
data_type : str or None
Explicit type override.
Returns
-------
state : InvestigationState
"""
from .investigation import InvestigationState, _make_history_entry
profile = self.profile_data(X, data_type=data_type)
state = InvestigationState(
phase='profiled',
data=X,
profile=profile,
next_action={
'action': 'plan',
'reason': 'Data profiled as %s with %d samples. '
'Ready to select detectors.'
% (profile['data_type'],
profile.get('n_samples', 0)),
},
)
state.history.append(_make_history_entry(
'profiled', 'start', 0,
'Profiled %s data' % profile['data_type']))
return state
[docs]
def plan(self, state: InvestigationState,
priority: str = 'balanced',
constraints: dict | None = None) -> InvestigationState:
"""Plan detection: select top-N detectors.
Wraps ``plan_detection()`` and extracts primary + alternatives
into ``state.plans`` (up to 3 detectors, v1 limit).
Parameters
----------
state : InvestigationState
priority : str
constraints : dict or None
Returns
-------
state : InvestigationState
"""
from .investigation import _make_history_entry
# Clear downstream state if re-planning from later phase
state.results = []
state.consensus = None
state.analysis = None
state.quality = None
constraints = constraints or {}
result = self.plan_detection(
state.profile, priority=priority, constraints=constraints)
# Extract primary + alternatives into flat list
plans = []
if result.get('detector_name'):
plans.append(result)
for alt in result.get('alternatives', []):
if alt.get('detector_name'):
plans.append(alt)
# Honor max_detectors (v1 cap at 3)
max_det = max(1, min(
int(constraints.get('max_detectors', 3)), 3))
state.plans = plans[:max_det]
state.phase = 'planned'
names = [p['detector_name'] for p in state.plans]
state.next_action = {
'action': 'run',
'reason': 'Top %d detectors selected: %s. Ready to run.'
% (len(state.plans), ', '.join(names)),
}
state.history.append(_make_history_entry(
'planned', 'plan', state.iteration,
'Selected %d detectors: %s' % (len(plans), ', '.join(names))))
return state
@staticmethod
def _require_phase(state: InvestigationState, expected: str) -> None:
"""Enforce workflow phase precondition."""
if state.phase != expected:
raise ValueError(
"Expected phase '%s', got '%s'. Call the "
"workflow methods in order: start -> plan -> "
"run -> analyze -> iterate/report."
% (expected, state.phase))
[docs]
def run(self, state: InvestigationState) -> InvestigationState:
"""Run detection with all planned detectors.
Wraps ``run_detection()`` per plan. Computes consensus via
rank normalization and majority vote. Records errors per
detector without stopping.
Parameters
----------
state : InvestigationState
Returns
-------
state : InvestigationState
"""
self._require_phase(state, 'planned')
from .investigation import _make_history_entry
results = []
for plan in state.plans:
try:
raw = self.run_detection(state.data, plan)
entry = dict(raw)
entry['detector_name'] = plan['detector_name']
entry['status'] = 'success'
entry['error'] = None
results.append(entry)
except Exception as exc:
logger.warning(
'Detector %s raised %s during run(): %s',
plan['detector_name'], type(exc).__name__, exc)
results.append({
'detector_name': plan['detector_name'],
'status': 'error',
'error': str(exc),
'plan': plan,
})
state.results = results
state.phase = 'detected'
# Compute consensus from successful detectors
successful = [r for r in results if r['status'] == 'success']
failed = [r for r in results if r['status'] == 'error']
state.consensus = compute_consensus(successful)
if state.consensus is None:
state.next_action = {
'action': 'confirm_with_user',
'reason': 'All %d detectors failed. Check data format '
'or try a different detector family.'
% len(results),
}
elif failed:
failed_names = [r['detector_name'] for r in failed]
successful_names = [r['detector_name'] for r in successful]
substitutes = self._suggest_substitutes(
state.profile,
exclude=failed_names + successful_names,
n_needed=len(failed_names))
state.next_action = {
'action': 'recover_detector_failure',
'reason': '%d/%d detectors failed (%s); consensus '
'currently uses %d detector(s).'
% (len(failed_names), len(results),
', '.join(failed_names),
state.consensus['n_detectors']),
'failed_detectors': failed_names,
'suggested_replacements': substitutes,
'suggestion': "iterate(state, {'action': 'recover'}) "
"to substitute failed detectors with %s, "
"or call analyze(state) to proceed with the "
"%d successful detector(s)."
% (substitutes if substitutes
else '<no substitutes available>',
state.consensus['n_detectors']),
}
elif state.consensus['n_detectors'] == 1:
state.next_action = {
'action': 'analyze',
'reason': 'Detection complete (1 detector).',
}
else:
state.next_action = {
'action': 'analyze',
'reason': 'Detection complete (%d detectors, '
'agreement=%.2f).' % (state.consensus['n_detectors'],
state.consensus['agreement']),
}
state.history.append(_make_history_entry(
'detected', 'run', state.iteration,
'%d/%d detectors succeeded'
% (len(successful), len(results))))
return state
def _suggest_substitutes(self, profile: dict, exclude: list,
n_needed: int) -> list:
"""Suggest substitute detector names for failed slots.
Calls ``plan_detection`` with ``exclude_detectors`` set to the
union of failed and already-running detector names, then takes
the top ``n_needed`` names from ``best + alternatives``. Best
effort: returns ``[]`` if planning raises or yields no
candidates.
"""
if n_needed <= 0:
return []
try:
plan = self.plan_detection(
profile,
constraints={'exclude_detectors': list(exclude)})
except Exception as exc:
logger.warning(
'plan_detection failed during substitute '
'suggestion with %s: %s',
type(exc).__name__, exc)
return []
candidates = []
if plan and plan.get('detector_name'):
candidates.append(plan['detector_name'])
for alt in plan.get('alternatives', []) if plan else []:
name = alt.get('detector_name')
if name and name not in candidates:
candidates.append(name)
return candidates[:n_needed]
[docs]
def analyze(self, state: InvestigationState) -> InvestigationState:
"""Analyze detection results with quality assessment.
Computes per-detector analysis, consensus analysis, quality
metrics (separation, agreement, stability), and selects
the best detector.
Parameters
----------
state : InvestigationState
Returns
-------
state : InvestigationState
"""
self._require_phase(state, 'detected')
from .investigation import _make_history_entry
state.phase = 'analyzed'
# All-error path
successful = [r for r in state.results
if r['status'] == 'success']
if not successful:
state.analysis = None
state.quality = {
'separation': 0.0, 'agreement': 0.0,
'stability': 0.0, 'overall': 0.0,
'verdict': 'low',
'explanation': 'All detectors failed.',
}
state.next_action = {
'action': 'confirm_with_user',
'reason': 'All detectors failed. Check data format '
'or try a different detector family.',
}
state.history.append(_make_history_entry(
'analyzed', 'analyze', state.iteration,
'All detectors failed'))
return state
# Per-detector analysis (aligned with state.results)
per_det = []
for r in state.results:
if r['status'] == 'success':
try:
a = self.analyze_results(r, X=state.data)
except Exception as exc:
logger.warning(
'analyze_results failed for %s with %s: %s',
r.get('detector_name', '<unknown>'),
type(exc).__name__, exc)
a = None
per_det.append(a)
else:
per_det.append(None)
# Consensus analysis (lightweight, not via analyze_results)
c = state.consensus
c_scores = c['scores']
c_labels = c['labels']
n_anomalies = int(c_labels.sum())
n_samples = len(c_labels)
top_k = min(10, n_samples)
top_indices = np.argsort(c_scores)[::-1][:top_k]
consensus_analysis = {
'n_anomalies': n_anomalies,
'anomaly_ratio': n_anomalies / max(n_samples, 1),
'score_distribution': {
'mean': float(np.mean(c_scores)),
'std': float(np.std(c_scores)),
'min': float(np.min(c_scores)),
'max': float(np.max(c_scores)),
'median': float(np.median(c_scores)),
'q25': float(np.percentile(c_scores, 25)),
'q75': float(np.percentile(c_scores, 75)),
},
'top_anomalies': [
{'index': int(i), 'score': float(c_scores[i])}
for i in top_indices],
'summary': '%d anomalies detected out of %d samples '
'(%.1f%%) by consensus of %d detectors.'
% (n_anomalies, n_samples,
100 * n_anomalies / max(n_samples, 1),
c['n_detectors']),
}
# Best detector selection
best_idx = select_best_detector(
state.results, c_scores)
state.analysis = {
'consensus_analysis': consensus_analysis,
'per_detector_analysis': per_det,
'best_detector': state.results[best_idx]['detector_name'],
'best_detector_index': best_idx,
'summary': consensus_analysis['summary'],
}
# Quality metrics
state.quality = compute_quality(
c_scores, c_labels, state.results, c)
state.analysis['summary'] += (
' Quality: %s (%.2f).'
% (state.quality['verdict'], state.quality['overall']))
# Next action based on quality
if state.quality['overall'] >= 0.4:
state.next_action = {
'action': 'report_to_user',
'reason': 'Results ready (quality=%s, %.2f).'
% (state.quality['verdict'],
state.quality['overall']),
'summary': state.analysis['summary'],
'confidence': state.quality['overall'],
}
else:
state.next_action = {
'action': 'iterate',
'reason': 'Low result quality (%.2f). Consider '
'trying different detectors.'
% state.quality['overall'],
'suggestion': 'Exclude lowest-agreement detector '
'and re-run.',
}
state.history.append(_make_history_entry(
'analyzed', 'analyze', state.iteration,
'Quality: %s (%.2f)' % (
state.quality['verdict'],
state.quality['overall'])))
return state
# ------------------------------------------------------------------
# V3 Session workflow: iterate
# ------------------------------------------------------------------
[docs]
def iterate(self, state: InvestigationState,
feedback: str | dict) -> InvestigationState:
"""Iterate based on feedback.
Structured dicts execute immediately. NL strings are
parsed with confidence; ambiguous feedback triggers
``'confirm_with_user'``.
Most actions require phase ``'analyzed'``. The ``'recover'``
action also accepts phase ``'detected'`` so the agent can
substitute failed detectors immediately after ``run()``
without first calling ``analyze()``.
Parameters
----------
state : InvestigationState
feedback : str or dict
Returns
-------
state : InvestigationState
"""
action = feedback.get('action') if isinstance(feedback, dict) else None
if action == 'recover':
if state.phase not in ('detected', 'analyzed'):
raise ValueError(
"Recover requires phase 'detected' or "
"'analyzed', got '%s'. Call run() first."
% state.phase)
else:
self._require_phase(state, 'analyzed')
if isinstance(feedback, dict):
return apply_structured_feedback(
state, feedback, self.kb, self.plan_detection, make_plan)
return apply_nl_feedback(
state, str(feedback), self.kb, self.plan_detection, make_plan)
# ------------------------------------------------------------------
# V3 Session workflow: report and investigate
# ------------------------------------------------------------------
[docs]
def report(self, state: InvestigationState,
format: str = 'text') -> str | dict:
"""Generate investigation report.
Text format wraps ``generate_report()`` for best detector,
prepending session-level context. JSON format returns a
native dict.
Parameters
----------
state : InvestigationState
format : str
'text' or 'json'.
Returns
-------
report : str or dict
"""
self._require_phase(state, 'analyzed')
if format not in ('text', 'json'):
raise ValueError(
"Unknown report format: '%s'. Use 'text' or 'json'."
% format)
if state.analysis is None:
raise ValueError(
"No successful detectors to report on. "
"Use iterate() to adjust the plan.")
best_idx = state.analysis['best_detector_index']
best_result = state.results[best_idx]
best_analysis = state.analysis['per_detector_analysis'][
best_idx]
if format == 'json':
return {
'session': {
'consensus': {
'scores': state.consensus[
'scores'].tolist(),
'labels': state.consensus[
'labels'].tolist(),
'n_detectors': state.consensus[
'n_detectors'],
'agreement': state.consensus[
'agreement'],
'disagreements': state.consensus[
'disagreements'],
},
'quality': state.quality,
'comparison': {
'agreement': state.consensus[
'agreement'],
'disagreements': state.consensus[
'disagreements'],
},
},
'best_detector': {
'name': best_result['detector_name'],
'scores': best_result[
'scores_train'].tolist(),
'labels': best_result[
'labels_train'].tolist(),
'threshold': best_result['threshold'],
'analysis': best_analysis,
},
}
# Text format
lines = []
lines.append('# Investigation Report')
lines.append('')
# Session section
lines.append('## Session Summary')
c = state.consensus
q = state.quality
lines.append('- **Detectors run:** %d' % c['n_detectors'])
lines.append('- **Detector agreement:** %.2f'
% c['agreement'])
lines.append('- **Quality verdict:** %s (%.2f)'
% (q['verdict'], q['overall']))
lines.append('- **Iterations:** %d' % state.iteration)
if c['disagreements']:
lines.append('- **Disagreements:** %d samples'
% len(c['disagreements']))
lines.append('')
# Best detector report (via generate_report)
detector_report = self.generate_report(
best_result, best_analysis, format='text')
lines.append(detector_report)
return '\n'.join(lines)
# ------------------------------------------------------------------
# Contamination diagnostics (O5 narrowed)
# ------------------------------------------------------------------
_CONTAMINATION_DIAGNOSTIC_PERCENTILES: tuple[int, ...] = (
50, 75, 90, 95, 99)
[docs]
def contamination_diagnostics(
self,
state: InvestigationState,
threshold_sweep: list[float] | None = None) -> dict:
"""Diagnostic helper for contamination calibration.
Reports the contamination value the run actually used, the
actual flagged rate from the consensus, the score-percentile
distribution, and (optionally) a threshold sweep showing what
fraction would be flagged at each candidate contamination
value. The agent can use these numbers to choose a sensible
next contamination before iterating.
This helper does NOT estimate contamination automatically and
does NOT mutate state. It is purely a read-only diagnostic the
agent uses to inform a subsequent
`engine.iterate(state, {'action': 'adjust_contamination',
'value': <rate>})` call.
Parameters
----------
state : InvestigationState
Must be in the 'analyzed' phase.
threshold_sweep : list of float or None
Optional sequence of candidate contamination values in
(0, 1). For each value c, the result includes the
corresponding threshold (the (1 - c) quantile of consensus
scores) and the resulting flagged rate. Use this to preview
how the flagged set would change before deciding to
iterate. Values outside (0, 1) are skipped.
Returns
-------
diagnostics : dict
Keys:
- ``effective_contamination`` (float or None): contamination
value from the primary plan's params, or ``None`` if the
plan has no contamination set.
- ``flagged_rate`` (float): actual fraction flagged by the
consensus labels.
- ``score_percentiles`` (dict[int, float]): consensus-score
percentiles at the 50th, 75th, 90th, 95th, and 99th.
- ``threshold_sweep`` (list of dict, optional): present only
when ``threshold_sweep`` was passed; each entry has
``contamination``, ``threshold``, and ``flagged_rate``.
"""
self._require_phase(state, 'analyzed')
primary_plan = state.plans[0] if state.plans else {}
effective = primary_plan.get('params', {}).get('contamination')
if state.consensus is None:
diagnostics: dict = {
'effective_contamination': effective,
'flagged_rate': 0.0,
'score_percentiles': {},
}
if threshold_sweep:
diagnostics['threshold_sweep'] = []
return diagnostics
scores = state.consensus['scores']
labels = state.consensus['labels']
n = len(labels)
flagged_rate = float(labels.sum()) / n if n > 0 else 0.0
score_percentiles = {
p: float(np.percentile(scores, p))
for p in self._CONTAMINATION_DIAGNOSTIC_PERCENTILES
}
diagnostics = {
'effective_contamination': effective,
'flagged_rate': flagged_rate,
'score_percentiles': score_percentiles,
}
if threshold_sweep:
sweep_results = []
for c in threshold_sweep:
if not (0.0 < c < 1.0):
# Skip invalid candidates rather than raise; this
# is a lenient diagnostic, not a strict validator.
continue
threshold = float(np.quantile(scores, 1.0 - c))
n_flagged = int((scores > threshold).sum())
sweep_results.append({
'contamination': float(c),
'threshold': threshold,
'flagged_rate': (
n_flagged / n if n > 0 else 0.0),
})
diagnostics['threshold_sweep'] = sweep_results
return diagnostics
# ------------------------------------------------------------------
# Hindsight validation (O8)
# ------------------------------------------------------------------
[docs]
def validate(self,
state: InvestigationState,
y: Any) -> dict:
"""Hindsight validation of consensus and per-detector results.
Computes label-based metrics from `y` against the consensus
labels and each successful detector, plus a
consensus-vs-best-detector diagnostic so the agent can see
whether consensus actually helped.
Pure functional; does not mutate state. Use after `analyze`
when held-out labels become available (e.g., a labeled cohort
opened post-hoc for hindsight evaluation). For routine
unsupervised detection runs, this method is unnecessary.
Parameters
----------
state : InvestigationState
Must be in the 'analyzed' phase.
y : array-like, shape (n_samples,)
Held-out binary labels (0 = inlier, 1 = anomaly). Length
must match the consensus.
Returns
-------
validation : dict
Keys:
- ``consensus`` (dict): label_metrics for the consensus
labels and scores.
- ``per_detector`` (dict[str, dict]): label_metrics per
successful detector, keyed by detector name.
- ``best_detector`` (dict or None): label_metrics for the
detector picked by `analyze` as best (or None when
`state.analysis` does not name one).
- ``consensus_vs_best`` (dict): comparison summary with
keys ``consensus_f1``, ``best_detector_f1`` (or None),
and ``consensus_helped`` (True if consensus F1 is at
least the best-detector F1; None when no best detector).
- ``false_positives`` (list[int]): row indices flagged by
consensus but inlier in `y`.
- ``false_negatives`` (list[int]): row indices not flagged
by consensus but anomaly in `y`.
Raises
------
ValueError
If `state` is not in 'analyzed' phase, if the consensus is
missing (all detectors failed), or if `len(y)` does not
match the consensus length.
"""
self._require_phase(state, 'analyzed')
if state.consensus is None:
raise ValueError(
"Cannot validate: state.consensus is None (all "
"detectors failed). Use iterate() to recover first.")
y_arr = np.asarray(y).astype(int)
consensus_labels = state.consensus['labels']
consensus_scores = state.consensus['scores']
n = len(consensus_labels)
if len(y_arr) != n:
raise ValueError(
f"y has {len(y_arr)} samples but the consensus has "
f"{n}; lengths must match.")
consensus_metrics = label_metrics(
y_arr, consensus_labels, consensus_scores)
fp_indices = np.where(
(consensus_labels == 1) & (y_arr == 0))[0].tolist()
fn_indices = np.where(
(consensus_labels == 0) & (y_arr == 1))[0].tolist()
per_detector: dict[str, dict] = {}
for r in state.results:
if r.get('status') == 'success':
per_detector[r['detector_name']] = label_metrics(
y_arr, r['labels_train'], r['scores_train'])
best_metrics = None
if state.analysis and 'best_detector' in state.analysis:
best_name = state.analysis['best_detector']
best_metrics = per_detector.get(best_name)
if best_metrics is not None:
consensus_helped = (
consensus_metrics['f1'] >= best_metrics['f1'])
best_f1 = best_metrics['f1']
else:
consensus_helped = None
best_f1 = None
return {
'consensus': consensus_metrics,
'per_detector': per_detector,
'best_detector': best_metrics,
'consensus_vs_best': {
'consensus_f1': consensus_metrics['f1'],
'best_detector_f1': best_f1,
'consensus_helped': consensus_helped,
},
'false_positives': [int(i) for i in fp_indices],
'false_negatives': [int(i) for i in fn_indices],
}
[docs]
def investigate(self, X: Any, data_type: str | None = None,
priority: str = 'balanced') -> InvestigationState:
"""One-shot investigation: start → plan → run → analyze.
Parameters
----------
X : array-like
Input data.
data_type : str or None
priority : str
Returns
-------
state : InvestigationState
"""
state = self.start(X, data_type=data_type)
state = self.plan(state, priority=priority)
state = self.run(state)
state = self.analyze(state)
return state
# ------------------------------------------------------------------
# Knowledge queries
# ------------------------------------------------------------------
[docs]
def list_detectors(self, data_type: str | None = None,
status: str = 'shipped') -> list[dict]:
"""List available detectors.
Parameters
----------
data_type : str or None
Filter by data type (e.g. 'tabular', 'text').
status : str
Filter by status. Use 'all' to list everything.
Returns
-------
detectors : list of dict
"""
if data_type:
return self.kb.list_by_data_type(data_type, status=status)
if status == 'all':
return [{'name': k, **v}
for k, v in self.kb.algorithms.items()]
return self.kb.list_by_status(status)
[docs]
def explain_detector(self, name: str) -> dict:
"""Explain a detector.
Parameters
----------
name : str
Detector short name (e.g. 'ECOD').
Returns
-------
info : dict
"""
algo = self.kb.get_algorithm(name)
if algo is None:
raise ValueError("Unknown detector '%s'" % name)
return {'name': name, **algo}
# Maps a data_type to (benchmark name, ranking key) for
# `compare_detectors` when the KB benchmark's top-level ranking
# already uses PyOD detector names (e.g., ADBench `overall_top_5`).
_COMPARE_BENCHMARK_RANKINGS: dict[str, tuple[str, str]] = {
'tabular': ('ADBench', 'overall_top_5'),
}
# Maps a data_type to benchmark-rank keys stored on each shipped
# detector's `benchmark_rank` metadata. Used when the benchmark's
# top-level ranking lists paper method names that do not match the
# PyOD detector names (e.g., TSB-AD lists "POLY", "KShapeAD", which
# do not match the shipped `KShape`, `MatrixProfile`, etc.). Lower
# rank value = better. When a detector carries multiple matching
# keys, the minimum (best) rank wins.
_COMPARE_BENCHMARK_RANK_KEYS: dict[str, tuple[str, ...]] = {
'time_series': ('TSB_AD_overall', 'TSB_AD_overall_iforest'),
}
def _benchmark_ranked_detectors(self, data_type: str,
top_k: int) -> list[str] | None:
"""Return up to `top_k` shipped detector names for `data_type`,
ranked by the modality-specific benchmark from the KB.
Two ranking sources are consulted in order. First, when
`_COMPARE_BENCHMARK_RANKINGS` lists the data_type, use the
benchmark's top-level overall ranking and filter to shipped
detectors. Second, when `_COMPARE_BENCHMARK_RANK_KEYS` lists
the data_type, read each shipped detector's `benchmark_rank`
metadata and sort ascending by best rank. In both modes,
detectors without an applicable rank are appended in catalog
order to fill `top_k`. Returns `None` when no applicable
ranking exists, signalling the caller to fall back to catalog
order. Used by `compare_detectors` (TA1).
"""
bench_lookup = self._COMPARE_BENCHMARK_RANKINGS.get(data_type)
if bench_lookup is not None:
bench_name, ranking_key = bench_lookup
bench = self.kb.benchmarks.get(bench_name)
if not bench:
return None
ranked = bench.get('rankings', {}).get(ranking_key, [])
if not ranked:
return None
shipped_dicts = self.list_detectors(data_type=data_type)
shipped_set = {d['name'] for d in shipped_dicts}
ranked_shipped = [n for n in ranked if n in shipped_set]
if not ranked_shipped:
return None
remaining = [d['name'] for d in shipped_dicts
if d['name'] not in ranked_shipped]
return (ranked_shipped + remaining)[:top_k]
rank_keys = self._COMPARE_BENCHMARK_RANK_KEYS.get(data_type)
if rank_keys is None:
return None
shipped_dicts = self.list_detectors(data_type=data_type)
ranked_pairs: list[tuple[int, str]] = []
unranked: list[str] = []
for detector in shipped_dicts:
ranks = detector.get('benchmark_rank', {})
values = [ranks[key] for key in rank_keys if key in ranks]
if values:
ranked_pairs.append((min(values), detector['name']))
else:
unranked.append(detector['name'])
if not ranked_pairs:
return None
ranked_names = [name for _, name in sorted(ranked_pairs)]
return (ranked_names + unranked)[:top_k]
[docs]
def compare_detectors(self, names: list[str] | None = None,
data_type: str | None = None,
top_k: int = 3) -> list[dict]:
"""Compare detectors.
When `names` is provided, returns explanations for those
detectors in input order.
When `names` is omitted and `data_type` has a benchmark-backed
ranking in the KB, returns up to `top_k` detectors ranked by
that benchmark, then appends remaining shipped detectors in
catalog order until `top_k` is reached. Two ranking sources are
supported: top-level `overall_top_5` for benchmarks whose names
match PyOD detector names (currently `tabular` via ADBench);
per-detector `benchmark_rank` metadata when the benchmark lists
paper method names (currently `time_series` via TSB-AD, sorted
ascending by the best matching rank key). For modalities
without an applicable ranking (`graph`, `text`, `image`,
`multimodal`) or when no `data_type` is given, falls back to
the catalog order from `list_detectors`.
Parameters
----------
names : list of str or None
Explicit list of detector names to compare.
data_type : str or None
Filter by data type.
top_k : int
Number of detectors to return when not using explicit names.
Returns
-------
comparison : list of dict
"""
if names:
return [self.explain_detector(n) for n in names]
if data_type:
ranked = self._benchmark_ranked_detectors(data_type, top_k)
if ranked is not None:
return [self.explain_detector(n) for n in ranked]
detectors = self.list_detectors(data_type=data_type)
return detectors[:top_k]
[docs]
def get_benchmarks(self, benchmark: str = 'all') -> dict:
"""Get benchmark results.
Parameters
----------
benchmark : str
Benchmark name, or 'all' for everything.
Returns
-------
benchmarks : dict
"""
if benchmark == 'all':
return self.kb.benchmarks
return {benchmark: self.kb.benchmarks.get(benchmark)}