Source code for darts.dataprocessing.dtw.cost_matrix

import array
from abc import ABC, abstractmethod
from itertools import repeat
from typing import Tuple

import numpy as np

from .window import CRWindow, Window

Elem = Tuple[int, int]


[docs]class CostMatrix(ABC): """ (n+1) x (m+1) Matrix Cell (i,j) corresponds to minimum total cost/distance of matching elements (i-1, j-1) in series1 and series2. Row 0 and column 0, are typically set to infinity, to prevent matching before the first element """ n: int m: int
[docs] @abstractmethod def fill(self, value: float): pass
@abstractmethod def __getitem__(self, item): pass @abstractmethod def __setitem__(self, key, value): pass @abstractmethod def __iter__(self): pass
[docs] @abstractmethod def to_dense(self) -> np.ndarray: """ Returns ------- Dense n x m numpy array, where empty cells are set to np.inf """ pass
@staticmethod def _from_window(window: Window): """ Creates a cost matrix from a window. Depending on the density of the active cells in the window, will select either a dense or sparse storage representation. Parameters ---------- window Takes a `Window` defining which cells are active and which are empty Returns ------- CostMatrix """ density = len(window) / ((window.n + 1) * (window.m + 1)) # In the future it might be worth implementing # a sparse cost matrix based on column_index/column_length # that will work even for non-contiguous windows if isinstance(window, CRWindow) and density < 0.5: return SparseCostMatrix(window) else: return DenseCostMatrix(window.n, window.m)
[docs]class DenseCostMatrix(np.ndarray, CostMatrix): def __new__(self, n, m): self.n = n self.m = m return super().__new__(self, (n + 1, m + 1), float)
[docs] def to_dense(self) -> np.ndarray: return self[1:, 1:]
def __iter__(self): for n in range(1, self.n): for m in range(1, self.m): yield n, m
[docs]class SparseCostMatrix(CostMatrix): def __init__(self, window: CRWindow): self.n = window.n self.m = window.m self.window = window self.offsets = np.empty(self.n + 2, dtype=int) self.column_ranges = window.column_ranges self.offsets[0] = 0 np.cumsum(window.column_lengths(), out=self.offsets[1:]) len = self.offsets[-1] self.offsets = array.array("i", self.offsets) self.dense = array.array("f", repeat(np.inf, len))
[docs] def fill(self, value): if value != np.inf: # should already be cleared to np.inf for i in range(len(self.dense)): self.dense[i] = value
[docs] def to_dense(self) -> np.ndarray: matrix = np.empty((self.n, self.m)) matrix.fill(np.inf) if isinstance(self.window, CRWindow): ranges = self.window.column_ranges lengths = self.window.column_lengths() # TODO express only in terms of numpy operations for i in range(1, self.n + 1): start = ranges[i * 2 + 0] - 1 end = ranges[i * 2 + 1] - 1 len = lengths[i] offset = self.offsets[i] matrix[i - 1][start:end] = self.dense[offset : offset + len] else: for i in range(1, self.n + 1): column_start = self.offsets[i] for j in range(1, self.m + 1): column_idx = self.window.column_index(i) if column_idx == -1: continue matrix[i - 1, j - 1] = self.dense[column_start + column_idx] return matrix
# PERFORMANCE: hot functions, avoid calling functions/excessive checking def __getitem__(self, elem: Elem): i, j = elem start = self.column_ranges[i * 2 + 0] end = self.column_ranges[i * 2 + 1] if start <= j < end: return self.dense[self.offsets[i] + j - start] return np.inf def __setitem__(self, elem, value): i, j = elem start = self.column_ranges[i * 2 + 0] self.dense[self.offsets[i] + j - start] = value def __iter__(self): return self.window.__iter__()