Source code for darts.ad.aggregators.aggregators

"""
Anomaly aggregators base classes
"""

# TODO:
# - add customize aggregators
# - add in trainable aggregators
#     - log regression
#     - decision tree
# - create show_all_combined (info about correlation, and from what path did
#   the anomaly alarm came from)
import sys

import numpy as np

if sys.version_info >= (3, 11):
    from typing import Self
else:
    from typing_extensions import Self
try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import Optional, Union

from darts import TimeSeries
from darts.ad.utils import (
    _assert_fit_called,
    _check_input,
    eval_metric_from_binary_prediction,
    series2seq,
)
from darts.logging import get_logger, raise_log

logger = get_logger(__name__)


[docs]class Aggregator(ABC): """Base class for Aggregators.""" def __init__(self): self.width_trained_on: Optional[int] = None @abstractmethod def __str__(self): """returns the name of the aggregator""" pass @abstractmethod def _predict_core(self, series: Sequence[TimeSeries]) -> Sequence[TimeSeries]: """Aggregates the sequence of multivariate binary series given as input into a sequence of univariate binary series. assuming the input is in the correct shape. Parameters ---------- series The sequence of multivariate binary series to aggregate Returns ------- TimeSeries Sequence of aggregated results """ pass
[docs] def predict( self, series: Union[TimeSeries, Sequence[TimeSeries]], name: str = "series", ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Aggregates the (sequence of) multivariate binary series given as input into a (sequence of) univariate binary series. Parameters ---------- series The (sequence of) multivariate binary series to aggregate. name The name of `series`. Returns ------- TimeSeries (Sequence of) aggregated results. """ called_with_single_series = isinstance(series, TimeSeries) series = _check_input( series, name=name, width_expected=self.width_trained_on, check_deterministic=True, check_binary=True, check_multivariate=True, ) pred = self._predict_core(series) return pred[0] if called_with_single_series else pred
[docs] def eval_metric( self, anomalies: Union[TimeSeries, Sequence[TimeSeries]], series: Union[TimeSeries, Sequence[TimeSeries]], window: int = 1, metric: Literal["recall", "precision", "f1", "accuracy"] = "recall", ) -> Union[float, Sequence[float]]: """Aggregates the (sequence of) multivariate series given as input into one (sequence of) series and evaluates the results against the ground truth anomaly labels. Parameters ---------- anomalies The (sequence of) binary ground truth anomaly labels (1 if it is an anomaly and 0 if not). series The (sequence of) predicted multivariate binary series to aggregate. window (Sequence of) integer value indicating the number of past samples each point represents in the (sequence of) series. The parameter will be used by the function `_window_adjustment_anomalies()` in darts.ad.utils to transform anomalies. metric The name of the metric function to use. Must be one of "recall", "precision", "f1", and "accuracy". Default: "recall". Returns ------- Union[float, Sequence[float]] (Sequence of) score for the (sequence of) series. """ pred_anomalies = self.predict(series) return eval_metric_from_binary_prediction( anomalies=anomalies, pred_anomalies=pred_anomalies, window=window, metric=metric, )
[docs]class FittableAggregator(Aggregator): """Base class for Aggregators that require training.""" def __init__(self): super().__init__() self._fit_called = False @abstractmethod def _fit_core(self, anomalies: Sequence[np.ndarray], series: Sequence[np.ndarray]): """Fits the aggregator, assuming the input is in the correct shape. Parameters ---------- anomalies The (sequence of) binary ground truth anomaly labels (1 if it is an anomaly and 0 if not). series The (sequence of) multivariate binary anomalies (predicted labels) to aggregate. """ pass
[docs] def fit( self, anomalies: Union[TimeSeries, Sequence[TimeSeries]], series: Union[TimeSeries, Sequence[TimeSeries]], ) -> Self: """Fit the aggregators on the (sequence of) multivariate binary anomaly series. If a list of series is given, they must have the same number of components. Parameters ---------- anomalies The (sequence of) binary ground truth anomaly labels (1 if it is an anomaly and 0 if not). series The (sequence of) multivariate binary series (predicted labels) to aggregate. """ pred_width = series2seq(series)[0].width series = _check_input( series, name="series", width_expected=pred_width, check_deterministic=True, check_binary=True, check_multivariate=True, ) self.width_trained_on = pred_width anomalies = _check_input( anomalies, name="anomalies", width_expected=1, check_deterministic=True, check_binary=True, check_multivariate=False, ) if len(anomalies) != len(series): raise_log( ValueError( "`anomalies` and `series` must contain the same number of series." ), logger=logger, ) anomalies_vals, series_vals = [], [] for anom, pred_anom in zip(anomalies, series): anomalies_vals.append(anom.slice_intersect_values(pred_anom)[:, :, 0]) series_vals.append(pred_anom.slice_intersect_values(anom)[:, :, 0]) self._fit_core(anomalies_vals, series_vals) self._fit_called = True return self
[docs] def predict( self, series: Union[TimeSeries, Sequence[TimeSeries]], name: str = "series", ) -> Union[TimeSeries, Sequence[TimeSeries]]: _assert_fit_called(self._fit_called, name="Aggregator") return super().predict(series=series, name=name)