"""
DTW Windows
-----------
"""
import array
from abc import ABC, abstractmethod
from math import atan, tan
from typing import Tuple
import numpy as np
from darts.logging import raise_if, raise_if_not
[docs]class Window(ABC):
n: int
m: int
[docs] def init_size(self, n: int, m: int):
"""
Called by dtw to initialize the window to a certain size.
Parameters
----------
n
The width of the window, must be equal to the length of `series1` used for DTW
m
The height of the window, must be equal to the length of `series2` used for DTW
"""
self.n = n
self.m = m
@abstractmethod
def __len__(self) -> int:
pass
[docs] @abstractmethod
def column_index(self, elem: Tuple[int, int]) -> int:
"""Gives the number of active grid cells before row element j, in column i.
Parameters
----------
elem
(i,j) index, where i indexes columns and j rows
Returns
-------
int
The number of active grid cells before row element j, in column i,
If (i,j) is not an active grid cell returns -1
"""
def __contains__(self, item):
return self.column_index(item) != -1
[docs] @abstractmethod
def column_length(self, column: int) -> int:
"""Gives the number of active grid cells in a column.
Parameters
----------
column
A column in the window, must be within 0 < column < n+1
Returns
-------
int
The number of active grid cells in a column.
"""
[docs] def column_lengths(self) -> np.ndarray:
"""Gives the number of activate grid cells in each column.
Returns
-------
np.ndarray of shape (n+1,)
The number of active grid cells in each column.
"""
return np.array(self.column_length(i) for i in range(0, self.n + 1))
@abstractmethod
def __iter__(self):
"""
Returns
-------
Iterator
Iterate over all active cells in the window, yielding (i,j) tuple.
Expected to start with index (1,1) and end with index (n+1, m+1).
"""
pass
[docs]class NoWindow(Window):
"""
Window covers the entire grid,
meaning every possible alignment between series1 and series2 is considered.
"""
def __len__(self):
return self.n * self.m + 1 # include (0,0) element
[docs] def column_index(self, elem: Tuple[int, int]):
return elem[1] - 1
[docs] def column_length(self, column: int) -> int:
return self.m
[docs] def column_lengths(self) -> np.ndarray:
result = np.empty(self.n + 1)
result.fill(self.m)
result[0] = 1
return result
def __iter__(self):
for i in range(1, self.n + 1):
for j in range(1, self.m + 1):
yield i, j
[docs]def gtz(value): # greater than zero
return value if value > 0 else 0
[docs]class CRWindow(Window):
"""
Compressed row representation window.
Stores the range of active grid cells in each column.
Any window with contiguous columns can be expressed as a CRWindow.
Supports efficient iterative construction and updates.
"""
length: int
column_ranges: array.array
def __init__(self, n: int, m: int, ranges: np.ndarray = None):
"""
Parameters
----------
n
The width of the window, must be equal to the length of series1
m
The height of the window, must be equal to the length of series2
ranges
The ranges of active cells within a column [[start_column0, end_column0], ...]
with shape (n, 2) and where start >= 0 and end <= m.
"""
self.n = n
self.m = m
if ranges is not None:
raise_if_not(
ranges.shape == (n, 2),
f"Expects a 2d array with [start, end] for each column and shape = ({n}, 2)",
)
ranges = np.insert(ranges, 0, [0, 1], axis=0)
start = ranges[:, 0]
end = ranges[:, 1]
raise_if(np.any(start < 0), "Start must be >=0")
raise_if(np.any(end > m), "End must be <m")
diff = np.maximum(end - start, 0)
self.length = np.sum(diff)
ranges[1:] += 1
ranges = ranges.flatten()
else:
ranges = np.zeros((n + 1) * 2, dtype=int)
ranges[0::2] = self.m # start
ranges[1::2] = 0 # end
ranges = array.array("i", ranges)
ranges[0] = 0
ranges[1] = 1
self.length = 1
self.column_ranges = array.array("i", ranges)
[docs] def add_range(self, column: int, start: int, end: int):
"""Extends the active cells in the column by the range (start,end).
Ranges smaller than the current one are ignored.
Note (1, m+1), not (0,m) corresponds to an entire column.
Parameters
----------
column
Column int index
start
Row element int index where start >= 1 and start <= end
end
Row element int index where end >= 1 and end <= m+1
"""
if start < 1 or start > self.m:
raise IndexError(f"Start must be >=1 and <=m, got {start}")
if end < 1 or end > self.m + 1:
raise IndexError(f"End must be >=1 and <=m+1, got {end}")
start_idx = column * 2 + 0
end_idx = column * 2 + 1
orig_start = self.column_ranges[start_idx]
orig_end = self.column_ranges[end_idx]
start, end = min(orig_start, start), max(orig_end, end)
orig_row_length = gtz(orig_end - orig_start)
row_length = gtz(end - start)
self.length += row_length - orig_row_length
self.column_ranges[start_idx] = start
self.column_ranges[end_idx] = end
[docs] def add(self, elem: Tuple[int, int]):
"""Marks a grid cell as active.
Parameters
----------
elem
Tuple of grid cell index (column, row)
"""
self.add_range(elem[0], elem[1], elem[1] + 1)
[docs] def column_length(self, column: int) -> int:
start, end = self.column_ranges[column]
return gtz(end - start)
[docs] def column_index(self, elem: Tuple[int, int]) -> int:
i, j = elem
start, end = self.column_ranges[i]
if j < start or j >= end:
return -1
else:
return j - start
def __contains__(self, elem: Tuple[int, int]) -> bool:
i, j = elem
start, end = self.column_ranges[i]
return start <= j < end
def __iter__(self):
for i in range(1, self.n + 1):
start = self.column_ranges[i * 2 + 0]
end = self.column_ranges[i * 2 + 1]
for j in range(start, end):
yield i, j
[docs] def column_lengths(self) -> np.ndarray:
ranges = self.column_ranges
start = ranges[::2]
end = ranges[1::2]
return np.maximum(np.subtract(end, start), 0)
def __len__(self):
return self.length
[docs]class Itakura(CRWindow):
"""
Forms the Itakura parallelogram, where max_slope determines the slope of the steeper side.
Examples
--------
>>> x
>>> xxxx
>>> B xxxxxx
>>> xxxxxxxxxxx
>>> xxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxxxxxx C
>>> xxxxxxxxxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxxxxxxx
>>> A xxxxxxxxxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxxxxxx
>>> xxxxxxxxxxxxxxxx D
>>> xxxxxxxxxxxx
>>> xxxxxxxxx
>>> xxxxxx
>>> xxx
>>> x
"""
def __init__(self, max_slope: float):
"""
Parameters
----------
max_slope
The slope of the steeper parallelogram side.
"""
self.max_slope = max_slope
[docs] def init_size(self, n: int, m: int):
self.n = n
self.m = m
max_slope = self.max_slope
diagonal_slope = m / n # rise over run
raise_if_not(
max_slope > diagonal_slope,
f"Itakura slope {max_slope} must be greater than {diagonal_slope} to form valid parallelogram.",
)
max_slope_angle = atan(max_slope)
diagonal_slope_angle = atan(diagonal_slope)
diff_slope_angle = max_slope_angle - diagonal_slope_angle
min_slope = tan(diagonal_slope_angle - diff_slope_angle)
# Derivation for determining how wide the steep top sides (A) and shallow bottom (D) are
# max_slope*x + (n-x)*min_slope = m
# max_slope*x + n*min_slope - min_slope*x = m
# (max_slope - min_slope)*x = m - n*min_slope
# x = (m - n*min_slope) / (max_slope - min_slope)
ranges = np.zeros((self.n, 2), dtype=float)
shallow_bottom = int(
np.round((m - n * max_slope) / (min_slope - max_slope)) + 1
)
ranges[:shallow_bottom, 0] = np.arange(shallow_bottom)
ranges[shallow_bottom:, 0] = np.arange(n - shallow_bottom) + 1
ranges[:shallow_bottom, 0] *= min_slope
ranges[shallow_bottom:, 0] *= max_slope
ranges[shallow_bottom:, 0] += ranges[shallow_bottom - 1, 0]
steep_top = int(np.round((m - n * min_slope) / (max_slope - min_slope)))
ranges[:steep_top, 1] = np.arange(steep_top) + 1
ranges[steep_top:, 1] = np.arange(n - steep_top) + 1
ranges[:steep_top:, 1] *= max_slope
ranges[steep_top:, 1] *= min_slope
ranges[steep_top:, 1] += ranges[steep_top - 1, 1]
np.floor(ranges[:, 0], out=ranges[:, 0])
np.ceil(ranges[:, 1], out=ranges[:, 1])
ranges = np.maximum([0, 1], ranges)
ranges = np.minimum([self.m - 1, self.m], ranges)
ranges = ranges.astype(int)
ranges[0][0] = 0
super().__init__(n, m, ranges)
[docs]class SakoeChiba(CRWindow):
"""
Forms a diagonal window where window_size controls the maximum allowed shift between the two series.
If both time-series have the same time axis, window_size corresponds to the maximum number of time periods
"""
def __init__(self, window_size: int):
"""
Parameters
----------
window_size
The maximum allowed shift between the two series used in DTW.
"""
self.window_size = window_size
[docs] def init_size(self, n: int, m: int):
self.n = n
self.m = m
diff = abs(n - m)
raise_if_not(
diff < self.window_size,
f"Window size must at least cover size difference ({diff})",
)
ranges = np.repeat(np.arange(n), 2)
ranges[0::2] -= (self.window_size,)
ranges[1::2] += self.window_size
ranges[0::2] = np.maximum(0, ranges[0::2])
ranges[1::2] = np.minimum(self.m, ranges[1::2] + 1)
ranges = np.reshape(ranges, (-1, 2))
super().__init__(n, m, ranges)