Source code for darts.ad.scorers.wasserstein_scorer

"""
WassersteinScorer
-----

Wasserstein Scorer (distance function defined between probability distributions) [1]_.
The implementations is wrapped around `scipy.stats
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.wasserstein_distance.html>`_.

References
----------
.. [1] https://en.wikipedia.org/wiki/Wasserstein_metric
"""

from typing import Sequence

import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from scipy.stats import wasserstein_distance

from darts.ad.scorers.scorers import FittableAnomalyScorer
from darts.logging import get_logger, raise_if_not
from darts.timeseries import TimeSeries

logger = get_logger(__name__)


[docs]class WassersteinScorer(FittableAnomalyScorer): def __init__( self, window: int = 10, component_wise: bool = False, diff_fn="abs_diff", ) -> None: """ When calling ``fit(series)``, a moving window is applied, which results in a set of vectors of size `W`, where `W` is the window size. These vectors are kept in memory, representing the training distribution. The ``score(series)`` function will apply the same moving window. The Wasserstein distance is computed between the training distribution and each vector, resulting in an anomaly score. Alternatively, the scorer has the functions ``fit_from_prediction()`` and ``score_from_prediction()``. Both require two series (actual and prediction), and compute a "difference" series by applying the function ``diff_fn`` (default: absolute difference). The resulting series is then passed to the functions ``fit()`` and ``score()``, respectively. `component_wise` is a boolean parameter indicating how the model should behave with multivariate inputs series. If set to True, the model will treat each series dimension independently. If set to False, the model concatenates the dimensions in each windows of length `W` and computes a single score for all dimensions. **Training with** ``fit()``: The input can be a series (univariate or multivariate) or multiple series. The series will be partitioned into equal size subsequences. The subsequence will be of size `W` * `D`, with: * `W` being the size of the window given as a parameter `window` * `D` being the dimension of the series (`D` = 1 if univariate or if `component_wise` is set to True) For a series of length `N`, (`N` - `W` + 1)/W subsequences will be generated. If a list of series is given of length L, each series will be partitioned into subsequences, and the results will be concatenated into an array of length L * number of subsequences of each series. The arrays will be kept in memory, representing the training data distribution. In practice, the series or list of series can for instance represent residuals than can be considered independent and identically distributed (iid). If `component_wise` is set to True, the algorithm will be applied to each dimension independently. For each dimension, a PyOD model will be trained. **Computing score with** ``score()``: The input can be a series (univariate or multivariate) or a sequence of series. The given series must have the same dimension `D` as the data used to train the PyOD model. For each series, if the series is multivariate of dimension `D`: * if `component_wise` is set to False: it returns a univariate series (dimension=1). It represents the anomaly score of the entire series in the considered window at each timestamp. * if `component_wise` is set to True: it returns a multivariate series of dimension `D`. Each dimension represents the anomaly score of the corresponding component of the input. If the series is univariate, it returns a univariate series regardless of the parameter `component_wise`. A window of size `W` is rolled on the series with a stride equal to 1. It is the same size window `W` used during the training phase. Each value in the score series thus represents how anomalous the sample of the `W` previous values is. Parameters ---------- window Size of the sliding window that represents the number of samples in the testing distribution to compare with the training distribution in the Wasserstein function diff_fn Optionally, reduced function to use if two series are given. It will transform the two series into one. This allows the WassersteinScorer to compute the Wasserstein distance on the original series or on its residuals (difference between the prediction and the original series). Must be one of "abs_diff" and "diff" (defined in ``_diff_series()``). Default: "abs_diff" component_wise Boolean value indicating if the score needs to be computed for each component independently (True) or by concatenating the component in the considered window to compute one score (False). Default: False """ # TODO: # - understand better the math behind the Wasserstein distance when the test distribution contains # only one sample # - check if there is an equivalent Wasserstein distance for d-D distributions (currently only accepts 1D) if type(window) is int: # noqa: E721 if window > 0 and window < 10: logger.warning( f"The `window` parameter WassersteinScorer is smaller than 10 (w={window})." + " The value represents the window length rolled on the series given as" + " input in the ``score`` function. At each position, the w values will" + " constitute a subset, and the Wasserstein distance between the subset" + " and the train distribution will be computed. To better represent the" + " constituted test distribution, the window parameter should be larger" + " than 10." ) raise_if_not( type(component_wise) is bool, # noqa: E721 f"Parameter `component_wise` must be Boolean, found type: {type(component_wise)}.", ) self.component_wise = component_wise super().__init__( univariate_scorer=(not component_wise), window=window, diff_fn=diff_fn ) def __str__(self): return "WassersteinScorer" def _fit_core( self, list_series: Sequence[TimeSeries], ): self.training_data = np.concatenate( [s.all_values(copy=False) for s in list_series] ).squeeze(-1) if not self.component_wise: self.training_data = self.training_data.flatten() def _score_core(self, series: TimeSeries) -> TimeSeries: raise_if_not( self.width_trained_on == series.width, "Input must have the same number of components as the data used for" + " training the Wasserstein model, found number of components equal" + f" to {series.width} and expected {self.width_trained_on}.", ) np_series = series.all_values(copy=False) np_anomaly_score = [] if not self.component_wise: np_anomaly_score = [ wasserstein_distance(self.training_data, window_samples) for window_samples in sliding_window_view( np_series, window_shape=self.window, axis=0 ) .transpose(0, 3, 1, 2) .reshape(-1, self.window * series.width) ] return TimeSeries.from_times_and_values( series.time_index[self.window - 1 :], np_anomaly_score ) else: for component_idx in range(self.width_trained_on): score = [ wasserstein_distance( self.training_data[component_idx, :], window_samples ) for window_samples in sliding_window_view( np_series[:, component_idx], window_shape=self.window, axis=0, ) .transpose(0, 2, 1) .reshape(-1, self.window) ] np_anomaly_score.append(score) return TimeSeries.from_times_and_values( series.time_index[self.window - 1 :], list(zip(*np_anomaly_score)) )