Source code for darts.ad.scorers.scorers

"""
Scorers Base Classes
"""

# TODO:
#     - add stride for Scorers like Kmeans and Wasserstein
#     - add option to normalize the windows for kmeans? capture only the form and not the values.


from abc import ABC, abstractmethod
from typing import Any, Sequence, Union

import numpy as np

from darts import TimeSeries
from darts.ad.utils import (
    _assert_same_length,
    _assert_timeseries,
    _intersect,
    _sanity_check_two_series,
    _to_list,
    eval_accuracy_from_scores,
    show_anomalies_from_scores,
)
from darts.logging import get_logger, raise_if_not

logger = get_logger(__name__)


[docs]class AnomalyScorer(ABC): """Base class for all anomaly scorers""" def __init__(self, univariate_scorer: bool, window: int) -> None: raise_if_not( type(window) is int, # noqa: E721 f"Parameter `window` must be an integer, found type {type(window)}.", ) raise_if_not( window > 0, f"Parameter `window` must be stricly greater than 0, found size {window}.", ) self.window = window self.univariate_scorer = univariate_scorer def _check_univariate_scorer(self, actual_anomalies: Sequence[TimeSeries]): """Checks if `actual_anomalies` contains only univariate series when the scorer has the parameter 'univariate_scorer' set to True. 'univariate_scorer' is: True -> when the function of the scorer ``score(series)`` (or, if applicable, ``score_from_prediction(actual_series, pred_series)``) returns a univariate anomaly score regardless of the input `series` (or, if applicable, `actual_series` and `pred_series`). False -> when the scorer will return a series that has the same number of components as the input (can be univariate or multivariate). """ if self.univariate_scorer: raise_if_not( all([isinstance(s, TimeSeries) for s in actual_anomalies]), "all series in `actual_anomalies` must be of type TimeSeries.", ) raise_if_not( all([s.width == 1 for s in actual_anomalies]), f"Scorer {self.__str__()} will return a univariate anomaly score series (width=1)." + " Found a multivariate `actual_anomalies`." + " The evaluation of the accuracy cannot be computed between the two series.", ) def _check_window_size(self, series: TimeSeries): """Checks if the parameter window is less or equal than the length of the given series""" raise_if_not( self.window <= len(series), f"Window size {self.window} is greater than the targeted series length {len(series)}, " + "must be lower or equal. Decrease the window size or increase the length series input" + " to score on.", ) @property def is_probabilistic(self) -> bool: """Whether the scorer expects a probabilistic prediction for its first input.""" return False def _assert_stochastic(self, series: TimeSeries, name_series: str): "Checks if the series is stochastic (number of samples is higher than one)." raise_if_not( series.is_stochastic, f"Scorer {self.__str__()} is expecting `{name_series}` to be a stochastic timeseries" + f" (number of samples must be higher than 1, found: {series.n_samples}).", ) def _assert_deterministic(self, series: TimeSeries, name_series: str): "Checks if the series is deterministic (number of samples is equal to one)." if not series.is_deterministic: logger.warning( f"Scorer {self.__str__()} is expecting `{name_series}` to be a (sequence of) deterministic" + f" timeseries (number of samples must be equal to 1, found: {series.n_samples}). The " + "series will be converted to a deterministic series by taking the median of the samples.", ) series = series.quantile_timeseries(quantile=0.5) return series @abstractmethod def __str__(self): """returns the name of the scorer""" pass
[docs] def eval_accuracy_from_prediction( self, actual_anomalies: Union[TimeSeries, Sequence[TimeSeries]], actual_series: Union[TimeSeries, Sequence[TimeSeries]], pred_series: Union[TimeSeries, Sequence[TimeSeries]], metric: str = "AUC_ROC", ) -> Union[float, Sequence[float], Sequence[Sequence[float]]]: """Computes the anomaly score between `actual_series` and `pred_series`, and returns the score of an agnostic threshold metric. Parameters ---------- actual_anomalies The (sequence of) ground truth of the anomalies (1 if it is an anomaly and 0 if not) actual_series The (sequence of) actual series. pred_series The (sequence of) predicted series. metric Optionally, metric function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" Returns ------- Union[float, Sequence[float], Sequence[Sequence[float]]] Score of an agnostic threshold metric for the computed anomaly score - ``float`` if `actual_series` and `actual_series` are univariate series (dimension=1). - ``Sequence[float]`` * if `actual_series` and `actual_series` are multivariate series (dimension>1), returns one value per dimension, or * if `actual_series` and `actual_series` are sequences of univariate series, returns one value per series - ``Sequence[Sequence[float]]]`` if `actual_series` and `actual_series` are sequences of multivariate series. Outer Sequence is over the sequence input and the inner Sequence is over the dimensions of each element in the sequence input. """ actual_anomalies = _to_list(actual_anomalies) self._check_univariate_scorer(actual_anomalies) anomaly_score = self.score_from_prediction(actual_series, pred_series) return eval_accuracy_from_scores( actual_anomalies, anomaly_score, self.window, metric )
[docs] @abstractmethod def score_from_prediction(self, actual_series: Any, pred_series: Any) -> Any: pass
[docs] def show_anomalies_from_prediction( self, actual_series: TimeSeries, pred_series: TimeSeries, scorer_name: str = None, actual_anomalies: TimeSeries = None, title: str = None, metric: str = None, ): """Plot the results of the scorer. Computes the anomaly score on the two series. And plots the results. The plot will be composed of the following: - the actual_series and the pred_series. - the anomaly score of the scorer. - the actual anomalies, if given. It is possible to: - add a title to the figure with the parameter `title` - give personalized name to the scorer with `scorer_name` - show the results of a metric for the anomaly score (AUC_ROC or AUC_PR), if the actual anomalies is provided. Parameters ---------- actual_series The actual series to visualize anomalies from. pred_series The predicted series of `actual_series`. actual_anomalies The ground truth of the anomalies (1 if it is an anomaly and 0 if not) scorer_name Name of the scorer. title Title of the figure metric Optionally, Scoring function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" """ if isinstance(actual_series, Sequence): raise_if_not( len(actual_series) == 1, "``show_anomalies_from_prediction`` expects only one series for `actual_series`," + f" found a list of length {len(actual_series)} as input.", ) actual_series = actual_series[0] raise_if_not( isinstance(actual_series, TimeSeries), "``show_anomalies_from_prediction`` expects an input of type TimeSeries," + f" found type {type(actual_series)} for `actual_series`.", ) if isinstance(pred_series, Sequence): raise_if_not( len(pred_series) == 1, "``show_anomalies_from_prediction`` expects one series for `pred_series`," + f" found a list of length {len(pred_series)} as input.", ) pred_series = pred_series[0] raise_if_not( isinstance(pred_series, TimeSeries), "``show_anomalies_from_prediction`` expects an input of type TimeSeries," + f" found type: {type(pred_series)} for `pred_series`.", ) anomaly_score = self.score_from_prediction(actual_series, pred_series) if title is None: title = f"Anomaly results by scorer {self.__str__()}" if scorer_name is None: scorer_name = [f"anomaly score by {self.__str__()}"] return show_anomalies_from_scores( actual_series, model_output=pred_series, anomaly_scores=anomaly_score, window=self.window, names_of_scorers=scorer_name, actual_anomalies=actual_anomalies, title=title, metric=metric, )
[docs]class NonFittableAnomalyScorer(AnomalyScorer): """Base class of anomaly scorers that do not need training.""" def __init__(self, univariate_scorer, window) -> None: super().__init__(univariate_scorer=univariate_scorer, window=window) # indicates if the scorer is trainable or not self.trainable = False @abstractmethod def _score_core_from_prediction(self, series: Any) -> Any: pass
[docs] def score_from_prediction( self, actual_series: Union[TimeSeries, Sequence[TimeSeries]], pred_series: Union[TimeSeries, Sequence[TimeSeries]], ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Computes the anomaly score on the two (sequence of) series. If a pair of sequences is given, they must contain the same number of series. The scorer will score each pair of series independently and return an anomaly score for each pair. Parameters ---------- actual_series: The (sequence of) actual series. pred_series The (sequence of) predicted series. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] (Sequence of) anomaly score time series """ list_actual_series, list_pred_series = _to_list(actual_series), _to_list( pred_series ) _assert_same_length(list_actual_series, list_pred_series) anomaly_scores = [] for s1, s2 in zip(list_actual_series, list_pred_series): _sanity_check_two_series(s1, s2) s1, s2 = _intersect(s1, s2) self._check_window_size(s1) self._check_window_size(s2) anomaly_scores.append(self._score_core_from_prediction(s1, s2)) if ( len(anomaly_scores) == 1 and not isinstance(pred_series, Sequence) and not isinstance(actual_series, Sequence) ): return anomaly_scores[0] else: return anomaly_scores
[docs]class FittableAnomalyScorer(AnomalyScorer): """Base class of scorers that do need training.""" def __init__(self, univariate_scorer, window, diff_fn="abs_diff") -> None: super().__init__(univariate_scorer=univariate_scorer, window=window) # indicates if the scorer is trainable or not self.trainable = True # indicates if the scorer has been trained yet self._fit_called = False # function used in ._diff_series() to convert 2 time series into 1 if diff_fn in {"abs_diff", "diff"}: self.diff_fn = diff_fn else: raise ValueError(f"Metric should be 'diff' or 'abs_diff', found {diff_fn}")
[docs] def check_if_fit_called(self): """Checks if the scorer has been fitted before calling its `score()` function.""" raise_if_not( self._fit_called, f"The Scorer {self.__str__()} has not been fitted yet. Call ``fit()`` first.", )
[docs] def eval_accuracy( self, actual_anomalies: Union[TimeSeries, Sequence[TimeSeries]], series: Union[TimeSeries, Sequence[TimeSeries]], metric: str = "AUC_ROC", ) -> Union[float, Sequence[float], Sequence[Sequence[float]]]: """Computes the anomaly score of the given time series, and returns the score of an agnostic threshold metric. Parameters ---------- actual_anomalies The ground truth of the anomalies (1 if it is an anomaly and 0 if not) series The (sequence of) series to detect anomalies from. metric Optionally, metric function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" Returns ------- Union[float, Sequence[float], Sequence[Sequence[float]]] Score of an agnostic threshold metric for the computed anomaly score - ``float`` if `series` is a univariate series (dimension=1). - ``Sequence[float]`` * if `series` is a multivariate series (dimension>1), returns one value per dimension, or * if `series` is a sequence of univariate series, returns one value per series - ``Sequence[Sequence[float]]]`` if `series` is a sequence of multivariate series. Outer Sequence is over the sequence input and the inner Sequence is over the dimensions of each element in the sequence input. """ actual_anomalies = _to_list(actual_anomalies) self._check_univariate_scorer(actual_anomalies) anomaly_score = self.score(series) return eval_accuracy_from_scores( actual_anomalies, anomaly_score, self.window, metric )
[docs] def score( self, series: Union[TimeSeries, Sequence[TimeSeries]], ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Computes the anomaly score on the given series. If a sequence of series is given, the scorer will score each series independently and return an anomaly score for each series in the sequence. Parameters ---------- series The (sequence of) series to detect anomalies from. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] (Sequence of) anomaly score time series """ self.check_if_fit_called() list_series = _to_list(series) anomaly_scores = [] for s in list_series: _assert_timeseries(s) self._check_window_size(s) anomaly_scores.append( self._score_core(self._assert_deterministic(s, "series")) ) if len(anomaly_scores) == 1 and not isinstance(series, Sequence): return anomaly_scores[0] else: return anomaly_scores
[docs] def show_anomalies( self, series: TimeSeries, actual_anomalies: TimeSeries = None, scorer_name: str = None, title: str = None, metric: str = None, ): """Plot the results of the scorer. Computes the score on the given series input. And plots the results. The plot will be composed of the following: - the series itself. - the anomaly score of the score. - the actual anomalies, if given. It is possible to: - add a title to the figure with the parameter `title` - give personalized name to the scorer with `scorer_name` - show the results of a metric for the anomaly score (AUC_ROC or AUC_PR), if the actual anomalies is provided. Parameters ---------- series The series to visualize anomalies from. actual_anomalies The ground truth of the anomalies (1 if it is an anomaly and 0 if not) scorer_name Name of the scorer. title Title of the figure metric Optionally, Scoring function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" """ if isinstance(series, Sequence): raise_if_not( len(series) == 1, "``show_anomalies`` expects one series for `series`," + f" found a list of length {len(series)} as input.", ) series = series[0] raise_if_not( isinstance(series, TimeSeries), "``show_anomalies`` expects an input of type TimeSeries," + f" found type {type(series)} for `series`.", ) anomaly_score = self.score(series) if title is None: title = f"Anomaly results by scorer {self.__str__()}" if scorer_name is None: scorer_name = f"anomaly score by {self.__str__()}" return show_anomalies_from_scores( series, anomaly_scores=anomaly_score, window=self.window, names_of_scorers=scorer_name, actual_anomalies=actual_anomalies, title=title, metric=metric, )
[docs] def score_from_prediction( self, actual_series: Union[TimeSeries, Sequence[TimeSeries]], pred_series: Union[TimeSeries, Sequence[TimeSeries]], ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Computes the anomaly score on the two (sequence of) series. The function ``diff_fn`` passed as a parameter to the scorer, will transform `pred_series` and `actual_series` into one "difference" series. By default, ``diff_fn`` will compute the absolute difference (Default: "abs_diff"). If actual_series and pred_series are sequences, ``diff_fn`` will be applied to all pairwise elements of the sequences. The scorer will then transform this series into an anomaly score. If a sequence of series is given, the scorer will score each series independently and return an anomaly score for each series in the sequence. Parameters ---------- actual_series The (sequence of) actual series. pred_series The (sequence of) predicted series. Returns ------- Union[TimeSeries, Sequence[TimeSeries]] (Sequence of) anomaly score time series """ self.check_if_fit_called() list_actual_series, list_pred_series = _to_list(actual_series), _to_list( pred_series ) _assert_same_length(list_actual_series, list_pred_series) anomaly_scores = [] for s1, s2 in zip(list_actual_series, list_pred_series): _sanity_check_two_series(s1, s2) s1 = self._assert_deterministic(s1, "actual_series") s2 = self._assert_deterministic(s2, "pred_series") diff = self._diff_series(s1, s2) self._check_window_size(diff) anomaly_scores.append(self.score(diff)) if ( len(anomaly_scores) == 1 and not isinstance(pred_series, Sequence) and not isinstance(actual_series, Sequence) ): return anomaly_scores[0] else: return anomaly_scores
[docs] def fit( self, series: Union[TimeSeries, Sequence[TimeSeries]], ): """Fits the scorer on the given time series input. If sequence of series is given, the scorer will be fitted on the concatenation of the sequence. The assumption is that the series `series` used for training are generally anomaly-free. Parameters ---------- series The (sequence of) series with no anomalies. Returns ------- self Fitted Scorer. """ list_series = _to_list(series) for idx, s in enumerate(list_series): _assert_timeseries(s) if idx == 0: self.width_trained_on = s.width else: raise_if_not( s.width == self.width_trained_on, "series in `series` must have the same number of components," + f" found number of components equal to {self.width_trained_on}" + f" at index 0 and {s.width} at index {idx}.", ) self._check_window_size(s) self._assert_deterministic(s, "series") self._fit_core(list_series) self._fit_called = True
[docs] def fit_from_prediction( self, actual_series: Union[TimeSeries, Sequence[TimeSeries]], pred_series: Union[TimeSeries, Sequence[TimeSeries]], ): """Fits the scorer on the two (sequence of) series. The function ``diff_fn`` passed as a parameter to the scorer, will transform `pred_series` and `actual_series` into one series. By default, ``diff_fn`` will compute the absolute difference (Default: "abs_diff"). If `pred_series` and `actual_series` are sequences, ``diff_fn`` will be applied to all pairwise elements of the sequences. The scorer will then be fitted on this (sequence of) series. If a sequence of series is given, the scorer will be fitted on the concatenation of the sequence. The scorer assumes that the (sequence of) actual_series is anomaly-free. Parameters ---------- actual_series The (sequence of) actual series. pred_series The (sequence of) predicted series. Returns ------- self Fitted Scorer. """ list_actual_series, list_pred_series = _to_list(actual_series), _to_list( pred_series ) _assert_same_length(list_actual_series, list_pred_series) list_fit_series = [] for s1, s2 in zip(list_actual_series, list_pred_series): _sanity_check_two_series(s1, s2) s1 = self._assert_deterministic(s1, "actual_series") s2 = self._assert_deterministic(s2, "pred_series") list_fit_series.append(self._diff_series(s1, s2)) self.fit(list_fit_series) self._fit_called = True
@abstractmethod def _fit_core(self, series: Any) -> Any: pass @abstractmethod def _score_core(self, series: Any) -> Any: pass def _diff_series(self, series_1: TimeSeries, series_2: TimeSeries) -> TimeSeries: """Applies the ``diff_fn`` to the two time series. Converts two time series into 1. series_1 and series_2 must: - have a non empty time intersection - be of the same width W Parameters ---------- series_1 1st time series series_2: 2nd time series Returns ------- TimeSeries series of width W """ series_1, series_2 = _intersect(series_1, series_2) if self.diff_fn == "abs_diff": return (series_1 - series_2).map(lambda x: np.abs(x)) elif self.diff_fn == "diff": return series_1 - series_2 else: # found an non-existent diff_fn raise ValueError( f"Metric should be 'diff' or 'abs_diff', found {self.diff_fn}" )
[docs]class NLLScorer(NonFittableAnomalyScorer): """Parent class for all LikelihoodScorer""" def __init__(self, window) -> None: super().__init__(univariate_scorer=False, window=window) def _score_core_from_prediction( self, actual_series: TimeSeries, pred_series: TimeSeries, ) -> TimeSeries: """For each timestamp of the inputs: - the parameters of the considered distribution are fitted on the samples of the probabilistic time series - the negative log-likelihood of the determinisitc time series values are computed If the series is multivariate, the score will be computed on each component independently. Parameters ---------- actual_series: A determinisict time series (number of samples per timestamp must be equal to 1) pred_series A probabilistic time series (number of samples per timestamp must be higher than 1) Returns ------- TimeSeries """ actual_series = self._assert_deterministic(actual_series, "actual_series") self._assert_stochastic(pred_series, "pred_series") np_actual_series = actual_series.all_values(copy=False) np_pred_series = pred_series.all_values(copy=False) np_anomaly_scores = [] for component_idx in range(pred_series.width): np_anomaly_scores.append( self._score_core_nllikelihood( # shape actual: (time_steps, ) # shape pred: (time_steps, samples) np_actual_series[:, component_idx].squeeze(-1), np_pred_series[:, component_idx], ) ) anomaly_scores = TimeSeries.from_times_and_values( pred_series.time_index, list(zip(*np_anomaly_scores)) ) def _window_adjustment_series(series: TimeSeries) -> TimeSeries: """Slides a window of size self.window along the input series, and replaces the value of the input time series by the mean of the values contained in the window (past self.window points, including itself). A series of length N will be transformed into a series of length N-self.window+1. """ if self.window == 1: # the process results in replacing every value by itself -> return directly the series return series else: return series.window_transform( transforms={ "window": self.window, "function": "mean", "mode": "rolling", "min_periods": self.window, }, treat_na="dropna", ) return _window_adjustment_series(anomaly_scores) @property def is_probabilistic(self) -> bool: return True @abstractmethod def _score_core_nllikelihood(self, input_1: Any, input_2: Any) -> Any: """For each timestamp, the corresponding distribution is fitted on the probabilistic time-series input_2, and returns the negative log-likelihood of the deterministic time-series input_1 given the distribution. """ pass