Source code for

Quantile Detector

Flags anomalies that are beyond some quantiles of historical data.
This is similar to a threshold-based detector, where the thresholds are
computed as quantiles of historical data when the detector is fitted.

from typing import Optional, Sequence, Union

import numpy as np

from import FittableDetector, _BoundedDetectorMixin
from import ThresholdDetector
from darts.logging import get_logger, raise_log
from darts.timeseries import TimeSeries

logger = get_logger(__name__)

[docs]class QuantileDetector(FittableDetector, _BoundedDetectorMixin): def __init__( self, low_quantile: Union[Sequence[float], float, None] = None, high_quantile: Union[Sequence[float], float, None] = None, ) -> None: """Quantile Detector Flags values that are either below or above the `low_quantile` and `high_quantile` quantiles of historical data, respectively. If a single value is provided for `low_quantile` or `high_quantile`, this same value will be used across all components of the series. If sequences of values are given for the parameters `low_quantile` and/or `high_quantile`, they must be of the same length, matching the dimensionality of the series passed to `fit()`, or have a length of 1. In the latter case, this single value will be used across all components of the series. If either `low_quantile` or `high_quantile` is None, the corresponding bound will not be used. However, at least one of the two must be set. Parameters ---------- low_quantile (Sequence of) quantile of historical data below which a value is regarded as anomaly. Must be between 0 and 1. If a sequence, must match the dimensionality of the series this detector is applied to. high_quantile (Sequence of) quantile of historical data above which a value is regarded as anomaly. Must be between 0 and 1. If a sequence, must match the dimensionality of the series this detector is applied to. """ super().__init__() low_quantile, high_quantile = self._prepare_boundaries( lower_bound=low_quantile, upper_bound=high_quantile, lower_bound_name="low_quantile", upper_bound_name="high_quantile", ) for q in (low_quantile, high_quantile): if not all([x is None or 0 <= x <= 1 for x in q]): raise_log( ValueError("All quantiles must be between 0 and 1, or None."), logger=logger, ) self.low_quantile = low_quantile self.high_quantile = high_quantile # We'll use an inner Threshold detector once the quantiles are fitted self.detector: Optional[ThresholdDetector] = None def _fit_core(self, series: Sequence[TimeSeries]) -> None: # if len(low) > 1 and len(high) > 1, then check it matches input width: if len(self.low_quantile) > 1 and len(self.low_quantile) != series[0].width: raise_log( ValueError( "The number of components of input must be equal to the number " "of values given for `high_quantile` or/and `low_quantile`. Found number of " f"components equal to {series[0].width} and expected {len(self.low_quantile)}." ), logger=logger, ) # otherwise, make them the right length self.low_quantile = self._expand_threshold(series[0], self.low_quantile) self.high_quantile = self._expand_threshold(series[0], self.high_quantile) # concatenate everything along the time axis np_series = np.concatenate( [series.all_values(copy=False) for series in series], axis=0 ) # move sample dimension to position 1 np_series = np.moveaxis(np_series, 2, 1) # flatten it in order to obtain an array of shape (time * samples, components) # where all samples of a given component are concatenated along time np_series = np_series.reshape(np_series.shape[0] * np_series.shape[1], -1) # Compute 2 thresholds (low, high) for each component: # TODO: we could make this more efficient when low_quantile or high_quantile contain a single value low_threshold = [ np.quantile(np_series[:, i], q=lo, axis=0) if lo is not None else None for i, lo in enumerate(self.low_quantile) ] high_threshold = [ np.quantile(np_series[:, i], q=hi, axis=0) if hi is not None else None for i, hi in enumerate(self.high_quantile) ] self.detector = ThresholdDetector( low_threshold=low_threshold, high_threshold=high_threshold ) def _detect_core(self, series: TimeSeries, name: str = "series") -> TimeSeries: return self.detector.detect(series, name=name) @property def low_threshold(self): return self.detector.low_threshold if self.detector is not None else None @property def high_threshold(self): return self.detector.high_threshold if self.detector is not None else None