Source code for darts.ad.anomaly_model.filtering_am

"""
Filtering Anomaly Model
-----------------------

A ``FilteringAnomalyModel`` wraps around a Darts filtering model and one or
several anomaly scorer(s) to compute anomaly scores
by comparing how actuals deviate from the model's predictions (filtered series).
"""

from typing import Dict, Sequence, Union

from darts.ad.anomaly_model.anomaly_model import AnomalyModel
from darts.ad.scorers.scorers import AnomalyScorer
from darts.ad.utils import _assert_same_length, _to_list
from darts.logging import get_logger, raise_if_not
from darts.models.filtering.filtering_model import FilteringModel
from darts.timeseries import TimeSeries

logger = get_logger(__name__)


[docs]class FilteringAnomalyModel(AnomalyModel): def __init__( self, model: FilteringModel, scorer: Union[AnomalyScorer, Sequence[AnomalyScorer]], ): """Filtering-based Anomaly Detection Model The filtering model may or may not be already fitted. The underlying assumption is that this model should be able to adequately filter the series in the absence of anomalies. For this reason, it is recommended to either provide a model that has already been fitted and evaluated to work appropriately on a series without anomalies, or to ensure that a simple call to the :func:`fit()` function of the model will be sufficient to train it to satisfactory performance on series without anomalies. Calling :func:`fit()` on the anomaly model will fit the underlying filtering model only if ``allow_model_training`` is set to ``True`` upon calling ``fit()``. In addition, calling :func:`fit()` will also fit the fittable scorers, if any. Parameters ---------- filter A filtering model from Darts that will be used to filter the actual time series scorer One or multiple scorer(s) that will be used to compare the actual and predicted time series in order to obtain an anomaly score ``TimeSeries``. If a list of `N` scorer is given, the anomaly model will call each one of the scorers and output a list of `N` anomaly scores ``TimeSeries``. """ raise_if_not( isinstance(model, FilteringModel), f"`model` must be a darts.models.filtering not a {type(model)}.", ) self.filter = model super().__init__(model=model, scorer=scorer)
[docs] def fit( self, series: Union[TimeSeries, Sequence[TimeSeries]], allow_model_training: bool = False, **filter_fit_kwargs, ): """Fit the underlying filtering model (if applicable) and the fittable scorers, if any. Train the filter (if not already fitted and `allow_filter_training` is set to True) and the scorer(s) on the given time series. The filter model will be applied to the given series, and the results will be used to train the scorer(s). Parameters ---------- series The (sequence of) series to be trained on. allow_model_training Boolean value that indicates if the filtering model needs to be fitted on the given series. If set to False, the model needs to be already fitted. Default: False filter_fit_kwargs Parameters to be passed on to the filtering model ``fit()`` method. Returns ------- self Fitted model """ # TODO: add support for covariates (see eg. Kalman Filter) raise_if_not( type(allow_model_training) is bool, # noqa: E721 f"`allow_filter_training` must be Boolean, found type: {type(allow_model_training)}.", ) # checks if model does not need training and all scorer(s) are not fittable if not allow_model_training and not self.scorers_are_trainable: logger.warning( f"The filtering model {self.model.__class__.__name__} is not required to be trained" + " because the parameter `allow_filter_training` is set to False, and no scorer" + " fittable. The ``.fit()`` function has no effect." ) return list_series = _to_list(series) raise_if_not( all([isinstance(s, TimeSeries) for s in list_series]), "all input `series` must be of type Timeseries.", ) if allow_model_training: # fit filtering model if hasattr(self.filter, "fit"): # TODO: check if filter is already fitted (for now fit it regardless -> only Kalman) raise_if_not( len(list_series) == 1, f"Filter model {self.model.__class__.__name__} can only be fitted on a" + " single time series, but multiple are provided.", ) self.filter.fit(list_series[0], **filter_fit_kwargs) else: raise ValueError( "`allow_filter_training` was set to True, but the filter" + f" {self.model.__class__.__name__} has no fit() method." ) else: # TODO: check if Kalman is fitted or not # if not raise error "fit filter before, or set `allow_filter_training` to TRUE" pass if self.scorers_are_trainable: list_pred = [self.filter.filter(series) for series in list_series] # fit the scorers for scorer in self.scorers: if hasattr(scorer, "fit"): scorer.fit_from_prediction(list_series, list_pred) return self
[docs] def show_anomalies( self, series: TimeSeries, actual_anomalies: TimeSeries = None, names_of_scorers: Union[str, Sequence[str]] = None, title: str = None, metric: str = None, **score_kwargs, ): """Plot the results of the anomaly model. Computes the score on the given series input and shows the different anomaly scores with respect to time. The plot will be composed of the following: - the series itself with the output of the filtering model - the anomaly score of each scorer. The scorer with different windows will be separated. - the actual anomalies, if given. It is possible to: - add a title to the figure with the parameter `title` - give personalized names for the scorers with `names_of_scorers` - show the results of a metric for each anomaly score (AUC_ROC or AUC_PR), if the actual anomalies are given Parameters ---------- series The series to visualize anomalies from. actual_anomalies The ground truth of the anomalies (1 if it is an anomaly and 0 if not) names_of_scorers Name of the scorers. Must be a list of length equal to the number of scorers in the anomaly_model. title Title of the figure metric Optionally, Scoring function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" score_kwargs parameters for the `.score()` function """ if isinstance(series, Sequence): raise_if_not( len(series) == 1, f"`show_anomalies` expects one series, found a sequence of length {len(series)} as input.", ) series = series[0] anomaly_scores, model_output = self.score( series, return_model_prediction=True, **score_kwargs ) return self._show_anomalies( series, model_output=model_output, anomaly_scores=anomaly_scores, names_of_scorers=names_of_scorers, actual_anomalies=actual_anomalies, title=title, metric=metric, )
[docs] def score( self, series: Union[TimeSeries, Sequence[TimeSeries]], return_model_prediction: bool = False, **filter_kwargs, ): """Compute the anomaly score(s) for the given series. Predicts the given target time series with the filtering model, and applies the scorer(s) to compare the predicted (filtered) series and the provided series. Outputs the anomaly score(s) of the provided time series. Parameters ---------- series The (sequence of) series to score. return_model_prediction Boolean value indicating if the prediction of the model should be returned along the anomaly score Default: False filter_kwargs parameters of the Darts `.filter()` filtering model Returns ------- Union[TimeSeries, Sequence[TimeSeries], Sequence[Sequence[TimeSeries]]] Anomaly scores series generated by the anomaly model scorers - ``TimeSeries`` if `series` is a series, and the anomaly model contains one scorer. - ``Sequence[TimeSeries]`` * If `series` is a series, and the anomaly model contains multiple scorers, returns one series per scorer. * If `series` is a sequence, and the anomaly model contains one scorer, returns one series per series in the sequence. - ``Sequence[Sequence[TimeSeries]]`` if `series` is a sequence, and the anomaly model contains multiple scorers. The outer sequence is over the series, and inner sequence is over the scorers. """ raise_if_not( type(return_model_prediction) is bool, # noqa: E721 f"`return_model_prediction` must be Boolean, found type: {type(return_model_prediction)}.", ) list_series = _to_list(series) # TODO: vectorize this call later on if we have any filtering models allowing this list_pred = [self.filter.filter(s, **filter_kwargs) for s in list_series] scores = list( zip( *[ sc.score_from_prediction(list_series, list_pred) for sc in self.scorers ] ) ) if len(scores) == 1 and not isinstance(series, Sequence): # there's only one series scores = scores[0] if len(scores) == 1: # there's only one scorer scores = scores[0] if len(list_pred) == 1: list_pred = list_pred[0] if return_model_prediction: return scores, list_pred else: return scores
[docs] def eval_accuracy( self, actual_anomalies: Union[TimeSeries, Sequence[TimeSeries]], series: Union[TimeSeries, Sequence[TimeSeries]], metric: str = "AUC_ROC", **filter_kwargs, ) -> Union[ Dict[str, float], Dict[str, Sequence[float]], Sequence[Dict[str, float]], Sequence[Dict[str, Sequence[float]]], ]: """Compute the accuracy of the anomaly scores computed by the model. Predicts the `series` with the filtering model, and applies the scorer(s) on the filtered time series and the given target time series. Returns the score(s) of an agnostic threshold metric, based on the anomaly score given by the scorer(s). Parameters ---------- actual_anomalies The (sequence of) ground truth of the anomalies (1 if it is an anomaly and 0 if not) series The (sequence of) series to predict anomalies on. metric Optionally, Scoring function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" filter_kwargs parameters of the Darts `.filter()` filtering model Returns ------- Union[Dict[str, float], Dict[str, Sequence[float]], Sequence[Dict[str, float]], Sequence[Dict[str, Sequence[float]]]] Score for the time series. A (sequence of) dictionary with the keys being the name of the scorers, and the values being the metric results on the (sequence of) `series`. If the scorer treats every dimension independently (by nature of the scorer or if its component_wise is set to True), the values of the dictionary will be a Sequence containing the score for each dimension. """ list_series, list_actual_anomalies = _to_list(series), _to_list( actual_anomalies ) raise_if_not( all([isinstance(s, TimeSeries) for s in list_series]), "all input `series` must be of type Timeseries.", ) raise_if_not( all([isinstance(s, TimeSeries) for s in list_actual_anomalies]), "all input `actual_anomalies` must be of type Timeseries.", ) _assert_same_length(list_series, list_actual_anomalies) self._check_univariate(list_actual_anomalies) list_anomaly_scores = self.score(series=list_series, **filter_kwargs) acc_anomaly_scores = self._eval_accuracy_from_scores( list_actual_anomalies=list_actual_anomalies, list_anomaly_scores=list_anomaly_scores, metric=metric, ) if len(acc_anomaly_scores) == 1 and not isinstance(series, Sequence): return acc_anomaly_scores[0] else: return acc_anomaly_scores