Source code for darts.ad.anomaly_model.forecasting_am

"""
Forecasting Anomaly Model
-------------------------

A ``ForecastingAnomalyModel`` wraps around a Darts forecasting model and one or several anomaly
scorer(s) to compute anomaly scores by comparing how actuals deviate from the model's forecasts.
"""

# TODO:
#     - put start default value to its minimal value (wait for the release of historical_forecast)

import inspect
from typing import Dict, Optional, Sequence, Union

import pandas as pd

from darts.ad.anomaly_model.anomaly_model import AnomalyModel
from darts.ad.scorers.scorers import AnomalyScorer
from darts.ad.utils import _assert_same_length, _assert_timeseries, _to_list
from darts.logging import get_logger, raise_if_not
from darts.models.forecasting.forecasting_model import ForecastingModel
from darts.timeseries import TimeSeries

logger = get_logger(__name__)


[docs]class ForecastingAnomalyModel(AnomalyModel): def __init__( self, model: ForecastingModel, scorer: Union[AnomalyScorer, Sequence[AnomalyScorer]], ): """Forecasting-based Anomaly Detection Model The forecasting model may or may not be already fitted. The underlying assumption is that `model` should be able to accurately forecast the series in the absence of anomalies. For this reason, it is recommended to either provide a model that has already been fitted and evaluated to work appropriately on a series without anomalies, or to ensure that a simple call to the :func:`fit()` method of the model will be sufficient to train it to satisfactory performance on a series without anomalies. Calling :func:`fit()` on the anomaly model will fit the underlying forecasting model only if ``allow_model_training`` is set to ``True`` upon calling ``fit()``. In addition, calling :func:`fit()` will also fit the fittable scorers, if any. Parameters ---------- model An instance of a Darts forecasting model. scorer One or multiple scorer(s) that will be used to compare the actual and predicted time series in order to obtain an anomaly score ``TimeSeries``. If a list of `N` scorers is given, the anomaly model will call each one of the scorers and output a list of `N` anomaly scores ``TimeSeries``. """ raise_if_not( isinstance(model, ForecastingModel), f"Model must be a darts ForecastingModel not a {type(model)}.", ) self.model = model super().__init__(model=model, scorer=scorer)
[docs] def fit( self, series: Union[TimeSeries, Sequence[TimeSeries]], past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, allow_model_training: bool = False, forecast_horizon: int = 1, start: Union[pd.Timestamp, float, int] = 0.5, num_samples: int = 1, **model_fit_kwargs, ): """Fit the underlying forecasting model (if applicable) and the fittable scorers, if any. Train the model (if not already fitted and ``allow_model_training`` is set to True) and the scorer(s) (if fittable) on the given time series. Once the model is fitted, the series historical forecasts are computed, representing what would have been forecasted by this model on the series. The prediction and the series are then used to train the scorer(s). Parameters ---------- series One or multiple (if the model supports it) target series to be trained on (generally assumed to be anomaly-free). past_covariates Optional past-observed covariate series or sequence of series. This applies only if the model supports past covariates. future_covariates Optional future-known covariate series or sequence of series. This applies only if the model supports future covariates. allow_model_training Boolean value that indicates if the forecasting model needs to be fitted on the given series. If set to False, the model needs to be already fitted. Default: False forecast_horizon The forecast horizon for the predictions. start The first point of time at which a prediction is computed for a future time. This parameter supports 3 different data types: ``float``, ``int`` and ``pandas.Timestamp``. In the case of ``float``, the parameter will be treated as the proportion of the time series that should lie before the first prediction point. In the case of ``int``, the parameter will be treated as an integer index to the time index of `series` that will be used as first prediction time. In case of ``pandas.Timestamp``, this time stamp will be used to determine the first prediction time directly. Default: 0.5 num_samples Number of times a prediction is sampled from a probabilistic model. Should be left set to 1 for deterministic models. model_fit_kwargs Parameters to be passed on to the forecast model ``fit()`` method. Returns ------- self Fitted model """ raise_if_not( type(allow_model_training) is bool, # noqa: E721 f"`allow_model_training` must be Boolean, found type: {type(allow_model_training)}.", ) # checks if model does not need training and all scorer(s) are not fittable if not allow_model_training and not self.scorers_are_trainable: logger.warning( f"The forecasting model {self.model.__class__.__name__} won't be trained" + " because the parameter `allow_model_training` is set to False, and no scorer" + " is fittable. ``.fit()`` method has no effect." ) return list_series = _to_list(series) raise_if_not( all([isinstance(s, TimeSeries) for s in list_series]), "all input `series` must be of type Timeseries.", ) list_past_covariates = self._prepare_covariates( past_covariates, list_series, "past" ) list_future_covariates = self._prepare_covariates( future_covariates, list_series, "future" ) model_fit_kwargs["past_covariates"] = list_past_covariates model_fit_kwargs["future_covariates"] = list_future_covariates # remove None elements from dictionary model_fit_kwargs = {k: v for k, v in model_fit_kwargs.items() if v} # fit forecasting model if allow_model_training: # the model has not been trained yet fit_signature_series = ( inspect.signature(self.model.fit).parameters["series"].annotation ) # checks if model can be trained on multiple time series or only on a time series # TODO: check if model can accept multivariate timeseries, raise error if given and model cannot if "Sequence[darts.timeseries.TimeSeries]" in str(fit_signature_series): self.model.fit(series=list_series, **model_fit_kwargs) else: raise_if_not( len(list_series) == 1, f"Forecasting model {self.model.__class__.__name__} only accepts a single time series" + " for the training phase and not a sequence of multiple of time series.", ) self.model.fit(series=list_series[0], **model_fit_kwargs) else: raise_if_not( self.model._fit_called, f"Model {self.model.__class__.__name__} needs to be trained, consider training " + "it beforehand or setting " + "`allow_model_training` to True (default: False). " + "The model will then be trained on the provided series.", ) # generate the historical_forecast() prediction of the model on the train set if self.scorers_are_trainable: # check if the window size of the scorers are lower than the max size allowed self._check_window_size(list_series, start) list_pred = [] for idx, series in enumerate(list_series): if list_past_covariates is not None: past_covariates = list_past_covariates[idx] if list_future_covariates is not None: future_covariates = list_future_covariates[idx] list_pred.append( self._predict_with_forecasting( series, past_covariates=past_covariates, future_covariates=future_covariates, forecast_horizon=forecast_horizon, start=start, num_samples=num_samples, ) ) # fit the scorers for scorer in self.scorers: if hasattr(scorer, "fit"): scorer.fit_from_prediction(list_series, list_pred) return self
def _prepare_covariates( self, covariates: Union[TimeSeries, Sequence[TimeSeries]], series: Sequence[TimeSeries], name_covariates: str, ) -> Sequence[TimeSeries]: """Convert `covariates` into Sequence, if not already, and checks if their length is equal to the one of `series`. Parameters ---------- covariates Covariate ("future" or "past") of `series`. series The series to be trained on. name_covariates Internal parameter for error message, a string indicating if it is a "future" or "past" covariates. Returns ------- Sequence[TimeSeries] Covariate time series """ if covariates is not None: list_covariates = _to_list(covariates) for covariates in list_covariates: _assert_timeseries( covariates, name_covariates + "_covariates input series" ) raise_if_not( len(list_covariates) == len(series), f"Number of {name_covariates}_covariates must match the number of given " + f"series, found length {len(list_covariates)} and expected {len(series)}.", ) return list_covariates if covariates is not None else None
[docs] def show_anomalies( self, series: TimeSeries, past_covariates: Optional[TimeSeries] = None, future_covariates: Optional[TimeSeries] = None, forecast_horizon: int = 1, start: Union[pd.Timestamp, float, int] = 0.5, num_samples: int = 1, actual_anomalies: TimeSeries = None, names_of_scorers: Union[str, Sequence[str]] = None, title: str = None, metric: str = None, ): """Plot the results of the anomaly model. Computes the score on the given series input and shows the different anomaly scores with respect to time. The plot will be composed of the following: - the series itself with the output of the forecasting model. - the anomaly score for each scorer. The scorers with different windows will be separated. - the actual anomalies, if given. It is possible to: - add a title to the figure with the parameter `title` - give personalized names for the scorers with `names_of_scorers` - show the results of a metric for each anomaly score (AUC_ROC or AUC_PR), if the actual anomalies are provided. Parameters ---------- series The series to visualize anomalies from. past_covariates An optional past-observed covariate series or sequence of series. This applies only if the model supports past covariates. future_covariates An optional future-known covariate series or sequence of series. This applies only if the model supports future covariates. forecast_horizon The forecast horizon for the predictions. start The first point of time at which a prediction is computed for a future time. This parameter supports 3 different data types: ``float``, ``int`` and ``pandas.Timestamp``. In the case of ``float``, the parameter will be treated as the proportion of the time series that should lie before the first prediction point. In the case of ``int``, the parameter will be treated as an integer index to the time index of `series` that will be used as first prediction time. In case of ``pandas.Timestamp``, this time stamp will be used to determine the first prediction time directly. num_samples Number of times a prediction is sampled from a probabilistic model. Should be left set to 1 for deterministic models. actual_anomalies The ground truth of the anomalies (1 if it is an anomaly and 0 if not) names_of_scorers Name of the scores. Must be a list of length equal to the number of scorers in the anomaly_model. title Title of the figure metric Optionally, Scoring function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" """ if isinstance(series, Sequence): raise_if_not( len(series) == 1, f"`show_anomalies` expects one series, found a list of length {len(series)} as input.", ) series = series[0] raise_if_not( isinstance(series, TimeSeries), f"`show_anomalies` expects an input of type TimeSeries, found type: {type(series)}.", ) anomaly_scores, model_output = self.score( series, past_covariates=past_covariates, future_covariates=future_covariates, forecast_horizon=forecast_horizon, start=start, num_samples=num_samples, return_model_prediction=True, ) return self._show_anomalies( series, model_output=model_output, anomaly_scores=anomaly_scores, names_of_scorers=names_of_scorers, actual_anomalies=actual_anomalies, title=title, metric=metric, )
[docs] def score( self, series: Union[TimeSeries, Sequence[TimeSeries]], past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, forecast_horizon: int = 1, start: Union[pd.Timestamp, float, int] = 0.5, num_samples: int = 1, return_model_prediction: bool = False, ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Compute anomaly score(s) for the given series. Predicts the given target time series with the forecasting model, and applies the scorer(s) on the prediction and the target input time series. Outputs the anomaly score of the given input time series. Parameters ---------- series The (sequence of) series to score on. past_covariates An optional past-observed covariate series or sequence of series. This applies only if the model supports past covariates. future_covariates An optional future-known covariate series or sequence of series. This applies only if the model supports future covariates. forecast_horizon The forecast horizon for the predictions. start The first point of time at which a prediction is computed for a future time. This parameter supports 3 different data types: ``float``, ``int`` and ``pandas.Timestamp``. In the case of ``float``, the parameter will be treated as the proportion of the time series that should lie before the first prediction point. In the case of ``int``, the parameter will be treated as an integer index to the time index of `series` that will be used as first prediction time. In case of ``pandas.Timestamp``, this time stamp will be used to determine the first prediction time directly. Default: 0.5 num_samples Number of times a prediction is sampled from a probabilistic model. Should be left set to 1 for deterministic models. return_model_prediction Boolean value indicating if the prediction of the model should be returned along the anomaly score Default: False Returns ------- Union[TimeSeries, Sequence[TimeSeries], Sequence[Sequence[TimeSeries]]] Anomaly scores series generated by the anomaly model scorers - ``TimeSeries`` if `series` is a series, and the anomaly model contains one scorer. - ``Sequence[TimeSeries]`` * if `series` is a series, and the anomaly model contains multiple scorers, returns one series per scorer. * if `series` is a sequence, and the anomaly model contains one scorer, returns one series per series in the sequence. - ``Sequence[Sequence[TimeSeries]]`` if `series` is a sequence, and the anomaly model contains multiple scorers. The outer sequence is over the series, and inner sequence is over the scorers. """ raise_if_not( type(return_model_prediction) is bool, # noqa: E721 f"`return_model_prediction` must be Boolean, found type: {type(return_model_prediction)}.", ) raise_if_not( self.model._fit_called, f"Model {self.model} has not been trained. Please call ``.fit()``.", ) list_series = _to_list(series) list_past_covariates = self._prepare_covariates( past_covariates, list_series, "past" ) list_future_covariates = self._prepare_covariates( future_covariates, list_series, "future" ) # check if the window size of the scorers are lower than the max size allowed self._check_window_size(list_series, start) list_pred = [] for idx, s in enumerate(list_series): if list_past_covariates is not None: past_covariates = list_past_covariates[idx] if list_future_covariates is not None: future_covariates = list_future_covariates[idx] list_pred.append( self._predict_with_forecasting( s, past_covariates=past_covariates, future_covariates=future_covariates, forecast_horizon=forecast_horizon, start=start, num_samples=num_samples, ) ) scores = list( zip( *[ sc.score_from_prediction(list_series, list_pred) for sc in self.scorers ] ) ) if len(scores) == 1 and not isinstance(series, Sequence): # there's only one series scores = scores[0] if len(scores) == 1: # there's only one scorer scores = scores[0] if len(list_pred) == 1: list_pred = list_pred[0] if return_model_prediction: return scores, list_pred else: return scores
def _check_window_size( self, series: Sequence[TimeSeries], start: Union[pd.Timestamp, float, int] ): """Checks if the parameters `window` of the scorers are smaller than the maximum window size allowed. The maximum size allowed is equal to the output length of the .historical_forecast() applied on `series`. It is defined by the parameter `start` and the series’ length. Parameters ---------- series The series given to the .historical_forecast() start Parameter of the .historical_forecast(): first point of time at which a prediction is computed for a future time. """ # biggest window of the anomaly_model scorers max_window = max(scorer.window for scorer in self.scorers) for s in series: max_possible_window = ( len(s.drop_before(s.get_timestamp_at_point(start))) + 1 ) raise_if_not( max_window <= max_possible_window, f"Window size {max_window} is greater than the targeted series length {max_possible_window}," + f" must be lower or equal. Reduce window size, or reduce start value (start: {start}).", ) def _predict_with_forecasting( self, series: TimeSeries, past_covariates: Optional[TimeSeries] = None, future_covariates: Optional[TimeSeries] = None, forecast_horizon: int = 1, start: Union[pd.Timestamp, float, int] = None, num_samples: int = 1, ) -> TimeSeries: """Compute the historical forecasts that would have been obtained by this model on the `series`. `retrain` is set to False if possible (this is not supported by all models). If set to True, it will always re-train the model on the entire available history, Parameters ---------- series The target time series to use to successively train and evaluate the historical forecasts. past_covariates An optional past-observed covariate series or sequence of series. This applies only if the model supports past covariates. future_covariates An optional future-known covariate series or sequence of series. This applies only if the model supports future covariates. forecast_horizon The forecast horizon for the predictions start The first point of time at which a prediction is computed for a future time. This parameter supports 3 different data types: ``float``, ``int`` and ``pandas.Timestamp``. In the case of ``float``, the parameter will be treated as the proportion of the time series that should lie before the first prediction point. In the case of ``int``, the parameter will be treated as an integer index to the time index of `series` that will be used as first prediction time. In case of ``pandas.Timestamp``, this time stamp will be used to determine the first prediction time directly. num_samples Number of times a prediction is sampled from a probabilistic model. Should be left set to 1 for deterministic models. Returns ------- TimeSeries Single ``TimeSeries`` instance created from the last point of each individual forecast. """ # TODO: raise an exception. We only support models that do not need retrain # checks if model accepts to not be retrained in the historical_forecasts() if self.model._supports_non_retrainable_historical_forecasts: # default: set to False. Allows a faster computation. retrain = False else: retrain = True historical_forecasts_param = { "past_covariates": past_covariates, "future_covariates": future_covariates, "forecast_horizon": forecast_horizon, "start": start, "retrain": retrain, "num_samples": num_samples, "stride": 1, "last_points_only": True, "verbose": False, } return self.model.historical_forecasts(series, **historical_forecasts_param)
[docs] def eval_accuracy( self, actual_anomalies: Union[TimeSeries, Sequence[TimeSeries]], series: Union[TimeSeries, Sequence[TimeSeries]], past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, forecast_horizon: int = 1, start: Union[pd.Timestamp, float, int] = 0.5, num_samples: int = 1, metric: str = "AUC_ROC", ) -> Union[ Dict[str, float], Dict[str, Sequence[float]], Sequence[Dict[str, float]], Sequence[Dict[str, Sequence[float]]], ]: """Compute the accuracy of the anomaly scores computed by the model. Predicts the `series` with the forecasting model, and applies the scorer(s) on the predicted time series and the given target time series. Returns the score(s) of an agnostic threshold metric, based on the anomaly score given by the scorer(s). Parameters ---------- actual_anomalies The (sequence of) ground truth of the anomalies (1 if it is an anomaly and 0 if not) series The (sequence of) series to predict anomalies on. past_covariates An optional past-observed covariate series or sequence of series. This applies only if the model supports past covariates. future_covariates An optional future-known covariate series or sequence of series. This applies only if the model supports future covariates. forecast_horizon The forecast horizon for the predictions. start The first point of time at which a prediction is computed for a future time. This parameter supports 3 different data types: ``float``, ``int`` and ``pandas.Timestamp``. In the case of ``float``, the parameter will be treated as the proportion of the time series that should lie before the first prediction point. In the case of ``int``, the parameter will be treated as an integer index to the time index of `series` that will be used as first prediction time. In case of ``pandas.Timestamp``, this time stamp will be used to determine the first prediction time directly. num_samples Number of times a prediction is sampled from a probabilistic model. Should be left set to 1 for deterministic models. metric Optionally, Scoring function to use. Must be one of "AUC_ROC" and "AUC_PR". Default: "AUC_ROC" Returns ------- Union[Dict[str, float], Dict[str, Sequence[float]], Sequence[Dict[str, float]], Sequence[Dict[str, Sequence[float]]]] Score for the time series. A (sequence of) dictionary with the keys being the name of the scorers, and the values being the metric results on the (sequence of) `series`. If the scorer treats every dimension independently (by nature of the scorer or if its component_wise is set to True), the values of the dictionary will be a Sequence containing the score for each dimension. """ list_actual_anomalies = _to_list(actual_anomalies) list_series = _to_list(series) raise_if_not( all([isinstance(s, TimeSeries) for s in list_series]), "all input `series` must be of type Timeseries.", ) raise_if_not( all([isinstance(s, TimeSeries) for s in list_actual_anomalies]), "all input `actual_anomalies` must be of type Timeseries.", ) _assert_same_length(list_actual_anomalies, list_series) self._check_univariate(list_actual_anomalies) list_anomaly_scores = self.score( series=list_series, past_covariates=past_covariates, future_covariates=future_covariates, forecast_horizon=forecast_horizon, start=start, num_samples=num_samples, ) acc_anomaly_scores = self._eval_accuracy_from_scores( list_actual_anomalies=list_actual_anomalies, list_anomaly_scores=list_anomaly_scores, metric=metric, ) if len(acc_anomaly_scores) == 1 and not isinstance(series, Sequence): return acc_anomaly_scores[0] else: return acc_anomaly_scores