Source code for darts.ad.detectors.detectors

"""
Detector Base Classes
"""

# TODO:
#     - check error message and add name of variable in the message error
#     - add possibility to input a list of param rather than only one number
#     - add more complex detectors
#         - create an ensemble fittable detector

import sys
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import Any, Literal, Optional, Union

if sys.version_info >= (3, 11):
    from typing import Self
else:
    from typing_extensions import Self

import numpy as np

from darts import TimeSeries
from darts.ad.utils import (
    _assert_fit_called,
    _check_input,
    eval_metric_from_binary_prediction,
)
from darts.logging import get_logger, raise_log
from darts.utils.ts_utils import series2seq

logger = get_logger(__name__)


[docs]class Detector(ABC): """Base class for all detectors""" def __init__(self, *args: Any, **kwargs: Any) -> None: self.width_trained_on: Optional[int] = None
[docs] def detect( self, series: Union[TimeSeries, Sequence[TimeSeries]], name: str = "series", ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Detect anomalies on given time series. Parameters ---------- series The (sequence of) series on which to detect anomalies. name The name of `series`. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] binary prediction (1 if considered as an anomaly, 0 if not) """ called_with_single_series = isinstance(series, TimeSeries) series = _check_input( series, name=name, width_expected=self.width_trained_on, check_deterministic=True, ) detected_series = [] for s in series: detected_series.append(self._detect_core(s, name=name)) return detected_series[0] if called_with_single_series else detected_series
[docs] def eval_metric( self, anomalies: Union[TimeSeries, Sequence[TimeSeries]], pred_scores: Union[TimeSeries, Sequence[TimeSeries]], window: int = 1, metric: Literal["recall", "precision", "f1", "accuracy"] = "recall", ) -> Union[float, Sequence[float], Sequence[Sequence[float]]]: """Score the results against true anomalies. Parameters ---------- anomalies The (sequence of) ground truth binary anomaly series (`1` if it is an anomaly and `0` if not). pred_scores The (sequence of) of estimated anomaly score series indicating how anomalous each window of size w is. window Integer value indicating the number of past samples each point represents in the `pred_scores`. metric The name of the metric function to use. Must be one of "recall", "precision", "f1", and "accuracy". Default: "recall". Returns ------- Union[float, Sequence[float], Sequence[Sequence[float]]] Metric results for each anomaly score """ return eval_metric_from_binary_prediction( anomalies=anomalies, pred_anomalies=self.detect(pred_scores), window=window, metric=metric, )
@abstractmethod def _detect_core(self, series: TimeSeries, name: str = "series") -> TimeSeries: pass
[docs]class FittableDetector(Detector): """Base class of Detectors that require training.""" def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._fit_called = False
[docs] def detect( self, series: Union[TimeSeries, Sequence[TimeSeries]], name: str = "series", ) -> Union[TimeSeries, Sequence[TimeSeries]]: _assert_fit_called(self._fit_called, name="Detector") return super().detect(series, name=name)
[docs] def fit(self, series: Union[TimeSeries, Sequence[TimeSeries]]) -> Self: """Trains the detector on the given time series. Parameters ---------- series Time (sequence of) series to be used to train the detector. Returns ------- self Fitted Detector. """ width = series2seq(series)[0].width series = _check_input( series, name="series", width_expected=width, check_deterministic=True, check_binary=False, check_multivariate=False, ) self.width_trained_on = width self._fit_core(series) self._fit_called = True return self
[docs] def fit_detect( self, series: Union[TimeSeries, Sequence[TimeSeries]] ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Trains the detector and detects anomalies on the same series. Parameters ---------- series Time series to be used for training and be detected for anomalies. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] Binary prediction (1 if considered as an anomaly, 0 if not) """ self.fit(series) return self.detect(series, name="series")
@abstractmethod def _fit_core(self, series: Sequence[TimeSeries]) -> None: pass
class _BoundedDetectorMixin(ABC): """ A class containing functions supporting bounds-based detection, to be used as a mixin for some `Detector` subclasses. """ @staticmethod def _prepare_boundaries( lower_bound_name: str, upper_bound_name: str, lower_bound: Optional[Union[Sequence[float], float]] = None, upper_bound: Optional[Union[Sequence[float], float]] = None, ) -> tuple[list[Optional[float]], list[Optional[float]]]: """ Process the boundaries argument and perform some sanity checks Parameters ---------- lower_bound_name Name of the lower bound upper_bound_name Name of the upper bound lower_bound (Sequence of) numerical bound below which a value is regarded as anomaly. If a sequence, must match the dimensionality of the series this detector is applied to. upper_bound (Sequence of) numerical bound above which a value is regarded as anomaly. If a sequence, must match the dimensionality of the series this detector is applied to. Returns ------- lower_bound Lower bounds, as a list of values (at least one not None value) upper_bound Upper bounds, as a list of values (at least one not None value) """ if lower_bound is None and upper_bound is None: raise_log( ValueError( f"`{lower_bound_name} and `{upper_bound_name}` cannot both be `None`." ), logger=logger, ) def _prep_boundaries(boundaries) -> list[Optional[float]]: """Convert boundaries to List""" return ( boundaries.tolist() if isinstance(boundaries, np.ndarray) else ( [boundaries] if not isinstance(boundaries, Sequence) else boundaries ) ) # convert to list lower_bound = _prep_boundaries(lower_bound) upper_bound = _prep_boundaries(upper_bound) if all([lo is None for lo in lower_bound]) and all([ hi is None for hi in upper_bound ]): raise_log( ValueError("All provided upper and lower bounds values are None."), logger=logger, ) # match the lengths of the boundaries lower_bound = ( lower_bound * len(upper_bound) if len(lower_bound) == 1 else lower_bound ) upper_bound = ( upper_bound * len(lower_bound) if len(upper_bound) == 1 else upper_bound ) if not len(lower_bound) == len(upper_bound): raise_log( ValueError( f"Parameters `{lower_bound_name}` and `{upper_bound_name}` " f"must be of the same length `n`, found " f"`{lower_bound_name}`: n={len(lower_bound)} and " f"`{upper_bound_name}`: n={len(upper_bound)}." ), logger=logger, ) if not all([ lb is None or ub is None or lb <= ub for (lb, ub) in zip(lower_bound, upper_bound) ]): raise_log( ValueError( f"All values in `{lower_bound_name}` must be lower or equal" f"to their corresponding value in `{upper_bound_name}`." ), logger=logger, ) return lower_bound, upper_bound @staticmethod def _expand_threshold(series: TimeSeries, threshold: list[float]) -> list[float]: return threshold * series[0].width if len(threshold) == 1 else threshold @property @abstractmethod def low_threshold(self): pass @property @abstractmethod def high_threshold(self): pass