Source code for darts.utils.data.torch_datasets.inference_dataset

"""
Inference Datasets
------------------

- :class:`~darts.utils.data.inference_dataset.TorchInferenceDataset`
- :class:`~darts.utils.data.inference_dataset.SequentialTorchInferenceDataset`
"""

import bisect
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import Optional, Union

import numpy as np

from darts import TimeSeries
from darts.logging import get_logger, raise_log
from darts.utils.data.torch_datasets.dataset import TorchDataset
from darts.utils.data.torch_datasets.utils import TorchInferenceDatasetOutput
from darts.utils.data.utils import FeatureType
from darts.utils.historical_forecasts.utils import _process_predict_start_points_bounds
from darts.utils.ts_utils import series2seq

logger = get_logger(__name__)


[docs] class TorchInferenceDataset(TorchDataset, ABC): def __init__(self): """ Abstract class for all inference datasets that can be used with Darts' `TorchForecastingModel`. Provides samples to compute forecasts using a `TorchForecastingModel`. Each sample drawn from this dataset is an eight-element tuple extracted from a specific time window and set of single input `TimeSeries`. The elements are: - past_target: target `series` values in the input chunk - past_covariates: Optional `past_covariates` values in the input chunk - future_past_covariates: Optional `past_covariates` values in the forecast horizon (for auto-regression with `n>output_chunk_length`) - historic_future_covariates: Optional `future_covariates` values in the input chunk - future_covariates: Optional `future_covariates` values in the output chunk and forecast horizon - static_covariates: Optional `static_covariates` values of the `series` - target_series: the target `TimeSeries` - pred_time: the time of the first point in the forecast horizon Darts `TorchForecastingModel` can predict from instances of `TorchInferenceDataset` using the `predict_from_dataset()` method. `TorchInferenceDataset` inherits from torch `Dataset`; meaning that all subclasses must implement the `__getitem__()` method. All returned elements except `target_series` (`TimeSeries`) and `pred_time` (`pd.Timestamp` or `int`) must be of type `np.ndarray` (or `None` for optional covariates). """ super().__init__() @abstractmethod def __getitem__(self, idx: int) -> TorchInferenceDatasetOutput: """Returns a sample drawn from this dataset."""
[docs] class SequentialTorchInferenceDataset(TorchInferenceDataset): def __init__( self, series: Union[TimeSeries, Sequence[TimeSeries]], past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, n: int = 1, stride: int = 0, bounds: Optional[np.ndarray] = None, input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, use_static_covariates: bool = True, ): """Sequential Inference Dataset Each sample drawn from this dataset is an eight-element tuple extracted from a specific time window and set of single input `TimeSeries`. The elements are: - past_target: target `series` values in the input chunk - past_covariates: `past_covariates` values in the input chunk (`None` if `past_covariates=None`) - future_past_covariates: `past_covariates` values in the forecast horizon (`None` if `past_covariates=None` or `n<=output_chunk_length` / non-auto-regressive forecasting) - historic_future_covariates: `future_covariates` values in the input chunk (`None` if `future_covariates=None`) - future_covariates: `future_covariates` values in the forecast horizon (`None` if `future_covariates=None`) - static_covariates: `static_covariates` values of the `series` (`None` if `use_static_covariates=False`) - target_series: the target `TimeSeries` - pred_time: the time of the first point in the forecast horizon The output chunk / forecast horizon starts `output_chunk_length + output_chunk_shift` after the input chunk's start. The sample index determines: - the position / time of the extracted chunks relative to the end of a single target `series` - the index (which series and covariates) to use in case `series` (and covariates) are passed as a sequence of series. With `bounds=None`, all samples will be extracted relative to the end of the target `series` (input chunk's end time is the same as the target series' end time). Otherwise, samples will be extracted from the given boundaries `bounds` with a stride of `stride`. .. note:: "historic_future_covariates" are the values of the future-known covariates that fall into the sample's input chunk (the past window / history in the view of the sample). .. note:: "future_past_covariates" are past covariates that happen to be also known in the future - those are needed for forecasting with `n > output_chunk_length` (auto-regression) by any model relying on past covariates. For this reason, this dataset may also emit the "future past_covariates". Parameters ---------- series One or a sequence of target `TimeSeries` that are to be predicted into the future. past_covariates Optionally, one or a sequence of `TimeSeries` containing past covariates. If past covariates were used during training, they must be supplied at prediction. future_covariates Optionally, one or a sequence of `TimeSeries` containing future-known covariates. If future covariates were used during training, they must be supplied at prediction. n Forecast horizon: The number of time steps to predict after the end of the target series. stride Optionally, the number of time steps between two consecutive predictions. Can only be used together with `bounds`. bounds Optionally, an array of shape `(n series, 2)`, with the left and right prediction start point boundaries per series. The boundaries must represent the positional index of the series (0, len(series)). If provided, `stride` must be `>=1`. input_chunk_length The length of the lookback / past window the model takes as input. output_chunk_length The length of the lookahead / future window that the model emits as output (for the target) and takes as input (for future covariates). output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. use_static_covariates Whether to use/include static covariate data from the target `series`. """ super().__init__() # setup target and sequence series = series2seq(series) past_covariates = series2seq(past_covariates) future_covariates = series2seq(future_covariates) static_covariates = ( series[0].static_covariates if use_static_covariates else None ) for cov, cov_type in zip( [past_covariates, future_covariates], [FeatureType.PAST_COVARIATES, FeatureType.FUTURE_COVARIATES], ): name = cov_type.value if cov is not None and len(series) != len(cov): raise_log( ValueError( f"The sequence of `{name}` must have the same length as " f"the sequence of target `series`." ), logger=logger, ) if (bounds is not None and stride == 0) or (bounds is None and stride > 0): raise_log( ValueError( "Must supply either both `stride` and `bounds`, or none of them." ), logger=logger, ) if output_chunk_shift and n > output_chunk_length: raise_log( ValueError( "Cannot perform auto-regression `(n > output_chunk_length)` with a model that uses a " "shifted output chunk `(output_chunk_shift > 0)`." ), logger=logger, ) self.series = series self.past_covariates = past_covariates self.future_covariates = future_covariates self.uses_past_covariates = past_covariates is not None self.uses_future_covariates = future_covariates is not None self.uses_static_covariates_covariates = static_covariates is not None self.n = n self.input_chunk_length = input_chunk_length self.output_chunk_length = output_chunk_length self.output_chunk_shift = output_chunk_shift self.use_static_covariates = use_static_covariates self.stride = stride if bounds is None: self.bounds = bounds self.cum_lengths = None self.len_preds = len(self.series) else: self.bounds, self.cum_lengths = _process_predict_start_points_bounds( series=series, bounds=bounds, stride=stride, ) self.len_preds = self.cum_lengths[-1] def __len__(self): return self.len_preds @staticmethod def _find_list_index(index, cumulative_lengths, bounds, stride): list_index = bisect.bisect_right(cumulative_lengths, index) bound_left = bounds[list_index, 0] if list_index == 0: stride_idx = index * stride else: stride_idx = (index - cumulative_lengths[list_index - 1]) * stride return list_index, bound_left + stride_idx def __getitem__(self, idx: int) -> TorchInferenceDatasetOutput: # determine the series index, and the index + 1 (exclusive range) of the output chunk end within that series if self.bounds is None: series_idx = idx series_end_idx = len(self.series[idx]) else: series_idx, series_end_idx = self._find_list_index( idx, self.cum_lengths, self.bounds, self.stride, ) end_of_output_idx = ( series_end_idx + self.output_chunk_shift + self.output_chunk_length ) series = self.series[series_idx] if len(series) < self.input_chunk_length: raise_log( ValueError( f"The dataset contains target `series` that are too short to extract " f"the model input for prediction . Expected min length: `{self.input_chunk_length}`, " f"received length `{len(series)}` (at series sequence idx `{series_idx}`)." ), logger=logger, ) # load covariates past_covariates = ( self.past_covariates[series_idx] if self.uses_past_covariates else None ) future_covariates = ( self.future_covariates[series_idx] if self.uses_future_covariates else None ) idx_bounds = self._memory_indexer( series_idx=series_idx, series=series, shift=self.input_chunk_length + self.output_chunk_shift, input_chunk_length=self.input_chunk_length, output_chunk_length=self.output_chunk_length, end_of_output_idx=end_of_output_idx, past_covariates=past_covariates, future_covariates=future_covariates, sample_weight=None, n=self.n, ) series_vals = series.random_component_values(copy=False) # extract past target series start, end = idx_bounds[FeatureType.PAST_TARGET] pt = series_vals[start:end] # extract prediction start start, _ = idx_bounds[FeatureType.FUTURE_TARGET] if start < len(series): pred_start = series._time_index[start] else: pred_start = ( series._time_index[-1] + ((start + 1) - len(series)) * series.freq ) # past cov, future past cov, historic future cov, future cov, static cov pc, fpc, hfc, fc, sc = None, None, None, None, None # extract past covariates if self.uses_past_covariates: # past part of past covariates start, end = idx_bounds[FeatureType.PAST_COVARIATES] vals = past_covariates.random_component_values(copy=False) pc = vals[start:end] # future part of past covariates (`None` if not performing auto-regression) fpc_start, fpc_end = idx_bounds[FeatureType.FUTURE_PAST_COVARIATES] fpc = vals[fpc_start:fpc_end] if fpc_start is not None else None # extract future covariates if self.uses_future_covariates: # future part of future covariates start, end = idx_bounds[FeatureType.FUTURE_COVARIATES] vals = future_covariates.random_component_values(copy=False) fc = vals[start:end] # historic part of future covariates hfc_start, hfc_end = idx_bounds[FeatureType.HISTORIC_FUTURE_COVARIATES] hfc = vals[hfc_start:hfc_end] # extract static covariates if self.uses_static_covariates_covariates: sc = series.static_covariates_values(copy=False) # ( # past target, # past cov, # future past cov, # historic future cov, # future cov, # static cov, # target series schema, # prediction start time, # ) return ( pt, pc, fpc, hfc, fc, sc, series.schema(copy=False), pred_start, )