Source code for darts.ad.aggregators.and_aggregator

"""
AND Aggregator
--------------
"""

from typing import Sequence

from darts import TimeSeries
from darts.ad.aggregators.aggregators import Aggregator
from darts.utils.utils import _parallel_apply


[docs]class AndAggregator(Aggregator): def __init__(self, n_jobs: int = 1) -> None: """AND Aggregator Aggregator that identifies a time step as anomalous if all the components are flagged as anomalous (logical AND). Parameters ---------- n_jobs The number of jobs to run in parallel. Defaults to `1` (sequential). Setting the parameter to `-1` means using all the available processors. """ super().__init__() self._n_jobs = n_jobs def __str__(self) -> str: return "AndAggregator" def _predict_core( self, series: Sequence[TimeSeries], *args, **kwargs ) -> Sequence[TimeSeries]: def _compononents_and(s: TimeSeries): return TimeSeries.from_times_and_values( times=s.time_index, values=(s.all_values(copy=False).sum(axis=1) >= s.width).astype( s.dtype ), columns=["components_sum"], ) return _parallel_apply( [(s,) for s in series], _compononents_and, n_jobs=1, fn_args=args, fn_kwargs=kwargs, )