Source code for darts.ad.detectors.detectors

"""
Detector Base Classes
"""

# TODO:
#     - check error message and add name of variable in the message error
#     - rethink the positionning of fun _check_param()
#     - add possibility to input a list of param rather than only one number
#     - add more complex detectors
#         - create an ensemble fittable detector

from abc import ABC, abstractmethod
from typing import Any, Sequence, Union

from darts import TimeSeries
from darts.ad.utils import eval_accuracy_from_binary_prediction
from darts.logging import raise_if_not


[docs]class Detector(ABC): """Base class for all detectors""" def __init__(self, *args: Any, **kwargs: Any) -> None: pass
[docs] def detect( self, series: Union[TimeSeries, Sequence[TimeSeries]], ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Detect anomalies on given time series. Parameters ---------- series series on which to detect anomalies. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] binary prediciton (1 if considered as an anomaly, 0 if not) """ list_series = [series] if not isinstance(series, Sequence) else series raise_if_not( all([isinstance(s, TimeSeries) for s in list_series]), "all series in `series` must be of type TimeSeries.", ) raise_if_not( all([s.is_deterministic for s in list_series]), "all series in `series` must be deterministic (number of samples equal to 1).", ) detected_series = [] for s in list_series: detected_series.append(self._detect_core(s)) if len(detected_series) == 1 and not isinstance(series, Sequence): return detected_series[0] else: return detected_series
@abstractmethod def _detect_core(self, input: Any) -> Any: pass
[docs] def eval_accuracy( self, actual_anomalies: Union[TimeSeries, Sequence[TimeSeries]], anomaly_score: Union[TimeSeries, Sequence[TimeSeries]], window: int = 1, metric: str = "recall", ) -> Union[float, Sequence[float], Sequence[Sequence[float]]]: """Score the results against true anomalies. Parameters ---------- actual_anomalies The ground truth of the anomalies (1 if it is an anomaly and 0 if not). anomaly_score Series indicating how anomoulous each window of size w is. window Integer value indicating the number of past samples each point represents in the anomaly_score. metric Metric function to use. Must be one of "recall", "precision", "f1", and "accuracy". Default: "recall" Returns ------- Union[float, Sequence[float], Sequence[Sequence[float]]] Metric results for each anomaly score """ if isinstance(anomaly_score, Sequence): raise_if_not( all([isinstance(s, TimeSeries) for s in anomaly_score]), "all series in `anomaly_score` must be of type TimeSeries.", ) raise_if_not( all([s.is_deterministic for s in anomaly_score]), "all series in `anomaly_score` must be deterministic (number of samples equal to 1).", ) else: raise_if_not( isinstance(anomaly_score, TimeSeries), f"Input `anomaly_score` must be of type TimeSeries, found {type(anomaly_score)}.", ) raise_if_not( anomaly_score.is_deterministic, "Input `anomaly_score` must be deterministic (number of samples equal to 1).", ) return eval_accuracy_from_binary_prediction( actual_anomalies, self.detect(anomaly_score), window, metric )
[docs]class FittableDetector(Detector): """Base class of Detectors that need training.""" def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._fit_called = False
[docs] def detect( self, series: Union[TimeSeries, Sequence[TimeSeries]], ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Detect anomalies on given time series. Parameters ---------- series series on which to detect anomalies. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] binary prediciton (1 if considered as an anomaly, 0 if not) """ list_series = [series] if not isinstance(series, Sequence) else series raise_if_not( self._fit_called, "The Detector has not been fitted yet. Call `fit()` first.", ) raise_if_not( all([self.width_trained_on == s.width for s in list_series]), "all series in `series` must have the same number of components as the data " + "used for training the detector model, number of components in training: " + f" {self.width_trained_on}.", ) return super().detect(series)
@abstractmethod def _fit_core(self, input: Any) -> Any: pass
[docs] def fit(self, series: Union[TimeSeries, Sequence[TimeSeries]]) -> None: """Trains the detector on the given time series. Parameters ---------- series Time series to be used to train the detector. Returns ------- self Fitted Detector. """ list_series = [series] if not isinstance(series, Sequence) else series raise_if_not( all([isinstance(s, TimeSeries) for s in list_series]), "all series in `series` must be of type TimeSeries.", ) raise_if_not( all([s.is_deterministic for s in list_series]), "all series in `series` must be deterministic (number of samples equal to 1).", ) self.width_trained_on = list_series[0].width raise_if_not( all([s.width == self.width_trained_on for s in list_series]), "all series in `series` must have the same number of components.", ) self._fit_called = True return self._fit_core(list_series)
[docs] def fit_detect( self, series: Union[TimeSeries, Sequence[TimeSeries]] ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Trains the detector and detects anomalies on the same series. Parameters ---------- series Time series to be used for training and be detected for anomalies. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] Binary prediciton (1 if considered as an anomaly, 0 if not) """ self.fit(series) return self.detect(series)