Source code for darts.dataprocessing.dtw.dtw

"""
Dynamic Time Warping (DTW)
--------------------------
"""

import copy
from typing import Callable, Optional, Union

import numpy as np
import pandas as pd
import xarray as xr

from darts import TimeSeries
from darts.dataprocessing.dtw.cost_matrix import CostMatrix
from darts.dataprocessing.dtw.window import CRWindow, NoWindow, Window
from darts.logging import get_logger, raise_if, raise_if_not
from darts.timeseries import DIMS

logger = get_logger(__name__)

SeriesValue = Union[np.ndarray, np.floating]
DistanceFunc = Callable[[SeriesValue, SeriesValue], float]


# CORE ALGORITHM
def _dtw_cost_matrix(
    x: np.ndarray, y: np.ndarray, dist: DistanceFunc, window: Window
) -> CostMatrix:
    dtw = CostMatrix._from_window(window)

    dtw.fill(np.inf)
    dtw[0, 0] = 0

    for i, j in window:
        cost = dist(x[i - 1], y[j - 1])
        min_cost_prev = min(dtw[i - 1, j], dtw[i, j - 1], dtw[i - 1, j - 1])
        dtw[i, j] = cost + min_cost_prev

    return dtw


def _dtw_path(dtw: CostMatrix) -> np.ndarray:
    i = dtw.n
    j = dtw.m

    path = []

    while i > 0 or j > 0:
        path.append((i - 1, j - 1))

        stencil = [
            (i - 1, j - 1),  # diagonal
            (i - 1, j),  # left
            (i, j - 1),  # down
        ]

        costs = [dtw[i, j] for i, j in stencil]
        index_min = costs.index(min(costs))

        i, j = stencil[index_min]

    path.reverse()
    return np.array(path)


# MULTI-SCALE
def _down_sample(high_res: np.ndarray):
    needs_padding = len(high_res) & 1
    if needs_padding:
        high_res = np.append(high_res, high_res[-1])

    low_res = np.reshape(high_res, (-1, 2))
    low_res = np.mean(low_res, axis=1)

    return low_res


def _expand_window(low_res_path: np.ndarray, n: int, m: int, radius: int) -> CRWindow:
    high_res_grid = CRWindow(n, m)

    def is_valid(cell):
        valid_x = 1 <= cell[0] <= n
        valid_y = 1 <= cell[1] <= m

        return valid_x and valid_y

    # up-sample path, 1 grid point computed at half-resolution becomes at least 4
    # (1,2) and (2,1) allows for better DWT solution, as it isn't diagonally constrained
    #
    # 4-sample         6-sample
    #     X X             X X
    #     X X    vs.    X X X
    # X X             X X X
    # X X             X X

    pattern = [(0, 0, 2), (1, 0, 3), (2, 1, 2)]

    for i, j in low_res_path:
        for column, start, end in pattern:
            # +1 offset since x[0], y[0] maps to DTW[1][1]

            column += i * 2 + 1

            # ensure no out of bounds
            # enlarge vertically by radius is O(1), due to compressed row representation
            start = max(1, min(m + 1, start + j * 2 - radius))
            end = max(1, min(m + 1, end + j * 2 + 1 + radius))

            # to create an nxn block, add (0-n) range n times
            # ensure no out of bounds
            #   if column is too large m-column -> 0
            #   if column-k is too small, column < radius+1
            for k in range(0, min(radius + 1, column, n - column + 1)):
                high_res_grid.add_range(column - k, start, end)

    return high_res_grid


def _fast_dtw(
    x: np.ndarray, y: np.ndarray, dist: DistanceFunc, radius: int, depth: int = 0
) -> CostMatrix:
    n = len(x)
    m = len(y)
    min_size = radius + 2

    if n < min_size or m < min_size or radius == -1:
        window = NoWindow()
        window.init_size(n, m)
        cost = _dtw_cost_matrix(x, y, dist, window)

        return cost

    half_x = _down_sample(x)
    half_y = _down_sample(y)

    low_res_cost = _fast_dtw(half_x, half_y, dist, radius, depth + 1)
    low_res_path = _dtw_path(low_res_cost)
    window = _expand_window(low_res_path, len(x), len(y), radius)
    cost = _dtw_cost_matrix(x, y, dist, window)

    return cost


def _default_distance_multi(x_values: np.ndarray, y_values: np.ndarray):
    return np.sum(np.abs(x_values - y_values))


def _default_distance_uni(x_value: float, y_value: float):
    return abs(x_value - y_value)


# Public API Functions
[docs]class DTWAlignment: """ Dynamic Time Warping (DTW) Alignment. Attributes ---------- n The length of `series1` m The length of `series2` series1 A `TimeSeries` to align with `series2`. series2 A `TimeSeries` to align with `series1`. cost The `CostMatrix` for DTW. """ n: int m: int series1: TimeSeries series2: TimeSeries cost: CostMatrix def __init__(self, series1: TimeSeries, series2: TimeSeries, cost: CostMatrix): self.n = len(series1) self.m = len(series2) self.series1 = series1 self.series2 = series2 self.cost = cost from ._plot import plot, plot_alignment
[docs] def path(self) -> np.ndarray: """Gives the index paths from `series1` to `series2`. Returns ------- np.ndarray of shape `(len(path), 2)` An array of indices [[i0,j0], [i1,j1], [i2,j2], ...], where i indexes into series1 and j indexes into series2. Indices are in monotonic order, path[n] >= path[n-1] """ if hasattr(self, "_path"): return self._path self._path = _dtw_path(self.cost) return self._path
[docs] def distance(self) -> float: """Gives the total distance between pair-wise elements in the two series after warping. Returns ------- float The total distance between pair-wise elements in the two series after warping. """ return self.cost[(self.n, self.m)]
[docs] def mean_distance(self) -> float: """Gives the mean distance between pair-wise elements in the two series after warping. Returns ------- float The mean distance between pair-wise elements in the two series after warping. """ if hasattr(self, "_mean_distance"): return self._mean_distance path = self.path() self._mean_distance = self.distance() / len(path) return self._mean_distance
[docs] def warped(self) -> (TimeSeries, TimeSeries): """Warps the two time series according to the warp path returned by `DTWAlignment.path()`, which minimizes the pair-wise distance. This will bring two time series that are out-of-phase back into phase. Returns ------- (TimeSeries, TimeSeries) Two new TimeSeries instances of the same length, indexed by pd.RangeIndex. """ xa1 = self.series1.data_array(copy=False) xa2 = self.series2.data_array(copy=False) path = self.path() values1, values2 = xa1.values[path[:, 0]], xa2.values[path[:, 1]] # We set a RangeIndex for both series: warped_series1 = xr.DataArray( data=values1, dims=xa1.dims, coords={ self.series1._time_dim: pd.RangeIndex(values1.shape[0]), DIMS[1]: xa1.coords[DIMS[1]], }, attrs=xa1.attrs, ) warped_series2 = xr.DataArray( data=values2, dims=xa2.dims, coords={ self.series2._time_dim: pd.RangeIndex(values2.shape[0]), DIMS[1]: xa2.coords[DIMS[1]], }, attrs=xa2.attrs, ) time_dim1, time_dim2 = self.series1._time_dim, self.series2._time_dim # todo: prevent time information being lost after warping # Applying time index from series1 to series2 (take_dates = True) is disabled for consistency reasons # Applying the warp path to the dates directly will result in duplicate dates # and hence values being lost when converting back to a TimeSeries. # As a result series1 will not be warped, whereas series2 will be. # It could also cause the two series to have different lengths, if len(series1) < len(series2) # One could generate intermediate dates, but then the series would not have a consistent frequency take_dates = False if take_dates: time_index = warped_series1[time_dim1] time_index = time_index.rename({time_dim1: time_dim2}) warped_series2[time_dim2] = time_index return TimeSeries.from_xarray(warped_series1), TimeSeries.from_xarray( warped_series2 )
[docs]def dtw( series1: TimeSeries, series2: TimeSeries, window: Optional[Window] = None, distance: Union[DistanceFunc, None] = None, multi_grid_radius: int = -1, ) -> DTWAlignment: """ Determines the optimal alignment between two time series `series1` and `series2`, according to the Dynamic Time Warping algorithm. The alignment minimizes the distance between pair-wise elements after warping. All elements in the two series are matched and are in strictly monotonically increasing order. Considers only the values in the series, ignoring the time axis. Dynamic Time Warping can be applied to determine how closely two time series correspond, irrespective of phase, length or speed differences. Parameters ---------- series1 A `TimeSeries` to align with `series2`. series2 A `TimeSeries` to align with `series1`. window Optionally, a `Window` used to constrain the search for the optimal alignment: see `SakoeChiba` and `Itakura`. Default considers all possible alignments (`NoWindow`). distance Function taking as input either two `floats` for univariate series or two `np.ndarray`, and returning the distance between them. Defaults to the abs difference for univariate-data and the sum of the abs difference for multi-variate series. multi_grid_radius Default radius of `-1` results in an exact evaluation of the dynamic time warping algorithm. Without constraints DTW runs in O(nxm) time where n,m are the size of the series. Exact evaluation with no constraints, will result in a performance warning on large datasets. Setting `multi_grid_radius` to a value other than `-1`, will enable the approximate multi-grid solver, which executes in linear time, vs quadratic time for exact evaluation. Increasing radius trades solution accuracy for performance. Returns ------- DTWAlignment Helper object for getting warp path, mean_distance, distance and warped time series """ if window is None: window = NoWindow() if ( multi_grid_radius == -1 and type(window) is NoWindow and len(series1) * len(series2) > 10**6 ): logger.warning( "Exact evaluation will result in poor performance on large datasets." " Consider enabling multi-grid or using a window." ) both_univariate = series1.is_univariate and series2.is_univariate if distance is None: raise_if_not( series1.n_components == series2.n_components, "Expected series to have same number of components, or to supply custom distance function", logger, ) distance = _default_distance_uni if both_univariate else _default_distance_multi if both_univariate: values_x = series1.univariate_values(copy=False) values_y = series2.univariate_values(copy=False) else: values_x = series1.values(copy=False) values_y = series2.values(copy=False) raise_if( np.any(np.isnan(values_x)), "Dynamic Time Warping does not support nan values. " "You can use the module darts.utils.missing_values to fill them, " "before passing them to dtw.", logger, ) raise_if( np.any(np.isnan(values_y)), "Dynamic Time Warping does not support nan values. " "You can use the module darts.utils.missing_values to fill them," "before passing it into dtw", logger, ) window = copy.deepcopy(window) window.init_size(len(values_x), len(values_y)) raise_if(multi_grid_radius < -1, "Expected multi-grid radius to be positive or -1") if multi_grid_radius >= 0: raise_if_not( isinstance(window, NoWindow), "Multi-grid solver does not currently support windows", logger, ) cost_matrix = _fast_dtw(values_x, values_y, distance, multi_grid_radius) else: cost_matrix = _dtw_cost_matrix(values_x, values_y, distance, window) return DTWAlignment(series1, series2, cost_matrix)