Source code for darts.ad.detectors.quantile_detector

"""
Quantile Detector
-----------------

Flags anomalies that are beyond some quantiles of historical data.
This is similar to a threshold-based detector, where the thresholds are
computed as quantiles of historical data when the detector is fitted.
"""

from typing import Optional, Sequence, Union

import numpy as np

from darts.ad.detectors.detectors import FittableDetector, _BoundedDetectorMixin
from darts.ad.detectors.threshold_detector import ThresholdDetector
from darts.logging import get_logger, raise_log
from darts.timeseries import TimeSeries

logger = get_logger(__name__)


[docs]class QuantileDetector(FittableDetector, _BoundedDetectorMixin): def __init__( self, low_quantile: Union[Sequence[float], float, None] = None, high_quantile: Union[Sequence[float], float, None] = None, ) -> None: """Quantile Detector Flags values that are either below or above the `low_quantile` and `high_quantile` quantiles of historical data, respectively. If a single value is provided for `low_quantile` or `high_quantile`, this same value will be used across all components of the series. If sequences of values are given for the parameters `low_quantile` and/or `high_quantile`, they must be of the same length, matching the dimensionality of the series passed to `fit()`, or have a length of 1. In the latter case, this single value will be used across all components of the series. If either `low_quantile` or `high_quantile` is None, the corresponding bound will not be used. However, at least one of the two must be set. Parameters ---------- low_quantile (Sequence of) quantile of historical data below which a value is regarded as anomaly. Must be between 0 and 1. If a sequence, must match the dimensionality of the series this detector is applied to. high_quantile (Sequence of) quantile of historical data above which a value is regarded as anomaly. Must be between 0 and 1. If a sequence, must match the dimensionality of the series this detector is applied to. """ super().__init__() low_quantile, high_quantile = self._prepare_boundaries( lower_bound=low_quantile, upper_bound=high_quantile, lower_bound_name="low_quantile", upper_bound_name="high_quantile", ) for q in (low_quantile, high_quantile): if not all([x is None or 0 <= x <= 1 for x in q]): raise_log( ValueError("All quantiles must be between 0 and 1, or None."), logger=logger, ) self.low_quantile = low_quantile self.high_quantile = high_quantile # We'll use an inner Threshold detector once the quantiles are fitted self.detector: Optional[ThresholdDetector] = None def _fit_core(self, series: Sequence[TimeSeries]) -> None: # if len(low) > 1 and len(high) > 1, then check it matches input width: if len(self.low_quantile) > 1 and len(self.low_quantile) != series[0].width: raise_log( ValueError( "The number of components of input must be equal to the number " "of values given for `high_quantile` or/and `low_quantile`. Found number of " f"components equal to {series[0].width} and expected {len(self.low_quantile)}." ), logger=logger, ) # otherwise, make them the right length self.low_quantile = self._expand_threshold(series[0], self.low_quantile) self.high_quantile = self._expand_threshold(series[0], self.high_quantile) # concatenate everything along the time axis np_series = np.concatenate( [series.all_values(copy=False) for series in series], axis=0 ) # move sample dimension to position 1 np_series = np.moveaxis(np_series, 2, 1) # flatten it in order to obtain an array of shape (time * samples, components) # where all samples of a given component are concatenated along time np_series = np_series.reshape(np_series.shape[0] * np_series.shape[1], -1) # Compute 2 thresholds (low, high) for each component: # TODO: we could make this more efficient when low_quantile or high_quantile contain a single value low_threshold = [ np.quantile(np_series[:, i], q=lo, axis=0) if lo is not None else None for i, lo in enumerate(self.low_quantile) ] high_threshold = [ np.quantile(np_series[:, i], q=hi, axis=0) if hi is not None else None for i, hi in enumerate(self.high_quantile) ] self.detector = ThresholdDetector( low_threshold=low_threshold, high_threshold=high_threshold ) def _detect_core(self, series: TimeSeries, name: str = "series") -> TimeSeries: return self.detector.detect(series, name=name) @property def low_threshold(self): return self.detector.low_threshold if self.detector is not None else None @property def high_threshold(self): return self.detector.high_threshold if self.detector is not None else None