Source code for darts.ad.detectors.threshold_detector

"""
Threshold Detector
------------------

Detector that detects anomaly based on user-given threshold.
This detector compares time series values with user-given thresholds, and
identifies time points as anomalous when values are beyond the thresholds.
"""

from collections.abc import Sequence
from typing import Union

import numpy as np

from darts import TimeSeries
from darts.ad.detectors.detectors import Detector, _BoundedDetectorMixin
from darts.logging import get_logger, raise_log

logger = get_logger(__name__)


[docs]class ThresholdDetector(Detector, _BoundedDetectorMixin): def __init__( self, low_threshold: Union[int, float, Sequence[float], None] = None, high_threshold: Union[int, float, Sequence[float], None] = None, ) -> None: """Threshold Detector Flags values that are either below or above the `low_threshold` and `high_threshold`, respectively. If a single value is provided for `low_threshold` or `high_threshold`, this same value will be used across all components of the series. If sequences of values are given for the parameters `low_threshold` and/or `high_threshold`, they must be of the same length, matching the dimensionality of the series passed to `detect()`, or have a length of 1. In the latter case, this single value will be used across all components of the series. If either `low_threshold` or `high_threshold` is None, the corresponding bound will not be used. However, at least one of the two must be set. Parameters ---------- low_threshold (Sequence of) lower bounds. If a sequence, must match the dimensionality of the series this detector is applied to. high_threshold (Sequence of) upper bounds. If a sequence, must match the dimensionality of the series this detector is applied to. """ super().__init__() low_threshold, high_threshold = self._prepare_boundaries( lower_bound=low_threshold, upper_bound=high_threshold, lower_bound_name="low_threshold", upper_bound_name="high_threshold", ) self._low_threshold = low_threshold self._high_threshold = high_threshold def _detect_core(self, series: TimeSeries, name: str = "series") -> TimeSeries: if len(self.low_threshold) > 1 and len(self.low_threshold) != series.width: raise_log( ValueError( f"The number of components for each series in `{name}` must be " f"equal to the number of threshold values. Found number of " f"components equal to {series.width} and expected {len(self.low_threshold)}." ), logger=logger, ) # if length is 1, tile it to series width: low_threshold = self._expand_threshold(series[0], self.low_threshold) high_threshold = self._expand_threshold(series[0], self.high_threshold) # (time, components) np_series = series.values(copy=False) def _detect_fn(x, lo, hi): # x of shape (time,) for 1 component return (x < (-np.inf if lo is None else lo)) | ( x > (np.inf if hi is None else hi) ) detected = np.zeros_like(np_series, dtype=int) for component_idx in range(series.width): detected[:, component_idx] = _detect_fn( np_series[:, component_idx], low_threshold[component_idx], high_threshold[component_idx], ) return TimeSeries( times=series.time_index, values=np.expand_dims(detected, -1).astype(series.dtype), components=series.components, copy=False, **series._attrs, ) @property def low_threshold(self): return self._low_threshold @property def high_threshold(self): return self._high_threshold