Source code for

Horizon-Based Training Dataset

from typing import Optional, Sequence, Tuple, Union

import numpy as np

from darts import TimeSeries
from darts.logging import get_logger, raise_if_not

from .training_dataset import PastCovariatesTrainingDataset
from .utils import CovariateType

logger = get_logger(__name__)

[docs]class HorizonBasedDataset(PastCovariatesTrainingDataset): def __init__( self, target_series: Union[TimeSeries, Sequence[TimeSeries]], covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, output_chunk_length: int = 12, lh: Tuple[int, int] = (1, 3), lookback: int = 3, use_static_covariates: bool = True, ) -> None: """ A time series dataset containing tuples of (past_target, past_covariates, static_covariates, future_target) arrays, in a way inspired by the N-BEATS way of training on the M4 dataset: The "past" series have length `lookback * output_chunk_length`, and the "future" series has length `output_chunk_length`. Given the horizon `output_chunk_length` of a model, this dataset will compute some "past/future" splits as follows: First a "forecast point" is selected in the the range of the last `(min_lh * output_chunk_length, max_lh * output_chunk_length)` points before the end of the time series. The "future" then consists in the following `output_chunk_length` points, and the "past" will be the preceding `lookback * output_chunk_length` points. All the series in the provided sequence must be long enough; i.e. have length at least `(lookback + max_lh) * output_chunk_length`, and `min_lh` must be at least 1 (to have targets of length exactly `1 * output_chunk_length`). The target and covariates time series are sliced together using their time indexes for alignment. The sampling is uniform both over the number of time series and the number of samples per series; i.e. the i-th sample of this dataset has 1/(N*M) chance of coming from any of the M samples in any of the N time series in the sequence. Parameters ---------- target_series One or a sequence of target `TimeSeries`. covariates Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set, the provided sequence must have the same length as that of `target_series`. Moreover, all covariates in the sequence must have a time span large enough to contain all the required slices. The joint slicing of the target and covariates is relying on the time axes of both series. output_chunk_length The length of the "output" series emitted by the model lh A `(min_lh, max_lh)` interval for the forecast point, starting from the end of the series. For example, `(1, 3)` will select forecast points uniformly between `1*H` and `3*H` points before the end of the series. It is required that `min_lh >= 1`. lookback: A integer interval for the length of the input in the emitted input and output splits, expressed as a multiple of `output_chunk_length`. For instance, `lookback=3` will emit "inputs" of lengths `3 * output_chunk_length`. use_static_covariates Whether to use/include static covariate data from input series. """ super().__init__() self.target_series = ( [target_series] if isinstance(target_series, TimeSeries) else target_series ) self.covariates = ( [covariates] if isinstance(covariates, TimeSeries) else covariates ) self.covariate_type = CovariateType.PAST self.output_chunk_length = output_chunk_length self.min_lh, self.max_lh = lh self.lookback = lookback # Checks raise_if_not( self.max_lh >= self.min_lh >= 1, "The lh parameter should be an int tuple (min_lh, max_lh), " "with 1 <= min_lh <= max_lh", ) raise_if_not( covariates is None or len(self.target_series) == len(self.covariates), "The provided sequence of target series must have the same length as " "the provided sequence of covariate series.", ) self.nr_samples_per_ts = (self.max_lh - self.min_lh) * self.output_chunk_length self.total_nr_samples = len(self.target_series) * self.nr_samples_per_ts self.use_static_covariates = use_static_covariates def __len__(self): """ Returns the total number of possible (input, target) splits. """ return self.total_nr_samples def __getitem__( self, idx: int ) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray], np.ndarray]: # determine the index of the time series. target_idx = idx // self.nr_samples_per_ts target_series = self.target_series[target_idx] target_vals = target_series.random_component_values(copy=False) raise_if_not( len(target_vals) >= (self.lookback + self.max_lh) * self.output_chunk_length, "The dataset contains some input/target series that are shorter than " "`(lookback + max_lh) * H` ({}-th series)".format(target_idx), ) # determine the index lh_idx of the forecasting point (the last point of the input series, before the target) # lh_idx should be in [0, self.nr_samples_per_ts) lh_idx = idx - (target_idx * self.nr_samples_per_ts) # determine the index at the end of the output chunk end_of_output_idx = len(target_series) - ( (self.min_lh - 1) * self.output_chunk_length + lh_idx ) # optionally, load covariates covariate_series = ( self.covariates[target_idx] if self.covariates is not None else None ) main_covariate_type = ( CovariateType.NONE if self.covariates is None else CovariateType.PAST ) shift = self.lookback * self.output_chunk_length input_chunk_length = shift # get all indices for the current sample ( past_start, past_end, future_start, future_end, cov_start, cov_end, ) = self._memory_indexer( target_idx=target_idx, target_series=target_series, shift=shift, input_chunk_length=input_chunk_length, output_chunk_length=self.output_chunk_length, end_of_output_idx=end_of_output_idx, covariate_series=covariate_series, covariate_type=main_covariate_type, ) # extract sample target future_target = target_vals[future_start:future_end] past_target = target_vals[past_start:past_end] # optionally, extract sample covariates covariate = None if self.covariates is not None: raise_if_not( cov_end <= len(covariate_series), f"The dataset contains 'past' covariates that don't extend far enough into the future. " f"({idx}-th sample)", ) covariate = covariate_series.random_component_values(copy=False)[ cov_start:cov_end ] raise_if_not( len(covariate) == len(past_target), "The dataset contains 'past' covariates whose time axis doesn't allow to obtain the " "input (or output) chunk relative to the target series.", ) if self.use_static_covariates: static_covariate = target_series.static_covariates_values(copy=False) else: static_covariate = None return past_target, covariate, static_covariate, future_target