Source code for darts.utils.data.horizon_based_dataset

"""
Horizon-Based Training Dataset
------------------------------
"""

from typing import Optional, Sequence, Tuple, Union

import numpy as np

from darts import TimeSeries
from darts.logging import get_logger, raise_log
from darts.utils.data.training_dataset import PastCovariatesTrainingDataset
from darts.utils.data.utils import CovariateType, _process_sample_weight
from darts.utils.ts_utils import series2seq

logger = get_logger(__name__)


[docs]class HorizonBasedDataset(PastCovariatesTrainingDataset): def __init__( self, target_series: Union[TimeSeries, Sequence[TimeSeries]], covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, output_chunk_length: int = 12, lh: Tuple[int, int] = (1, 3), lookback: int = 3, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, ) -> None: """ A time series dataset containing tuples of (past_target, past_covariates, static_covariates, sample weights, future_target) arrays, in a way inspired by the N-BEATS way of training on the M4 dataset: https://arxiv.org/abs/1905.10437. The "past" series have length `lookback * output_chunk_length`, and the "future" series has length `output_chunk_length`. Given the horizon `output_chunk_length` of a model, this dataset will compute some "past/future" splits as follows: First a "forecast point" is selected in the the range of the last `(min_lh * output_chunk_length, max_lh * output_chunk_length)` points before the end of the time series. The "future" then consists in the following `output_chunk_length` points, and the "past" will be the preceding `lookback * output_chunk_length` points. All the series in the provided sequence must be long enough; i.e. have length at least `(lookback + max_lh) * output_chunk_length`, and `min_lh` must be at least 1 (to have targets of length exactly `1 * output_chunk_length`). The target and covariates time series are sliced together using their time indexes for alignment. The sampling is uniform both over the number of time series and the number of samples per series; i.e. the i-th sample of this dataset has 1/(N*M) chance of coming from any of the M samples in any of the N time series in the sequence. Parameters ---------- target_series One or a sequence of target `TimeSeries`. covariates Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set, the provided sequence must have the same length as that of `target_series`. Moreover, all covariates in the sequence must have a time span large enough to contain all the required slices. The joint slicing of the target and covariates is relying on the time axes of both series. output_chunk_length The length of the "output" series emitted by the model lh A `(min_lh, max_lh)` interval for the forecast point, starting from the end of the series. For example, `(1, 3)` will select forecast points uniformly between `1*H` and `3*H` points before the end of the series. It is required that `min_lh >= 1`. lookback: A integer interval for the length of the input in the emitted input and output splits, expressed as a multiple of `output_chunk_length`. For instance, `lookback=3` will emit "inputs" of lengths `3 * output_chunk_length`. use_static_covariates Whether to use/include static covariate data from input series. sample_weight Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, per label (each step in `output_chunk_length`), and per component. If a series or sequence of series, then those weights are used. If the weight series only have a single component / column, then the weights are applied globally to all components in `series`. Otherwise, for component-specific weights, the number of components must match those of `series`. If a string, then the weights are generated using built-in weighting functions. The available options are `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are computed globally based on the length of the longest series in `series`. Then for each series, the weights are extracted from the end of the global weights. This gives a common time weighting across all series. """ super().__init__() self.target_series = series2seq(target_series) self.covariates = series2seq(covariates) self.covariate_type = CovariateType.PAST if covariates is not None and len(self.target_series) != len(self.covariates): raise_log( ValueError( "The provided sequence of target series must have the same length as " "the provided sequence of covariate series." ), logger=logger, ) self.sample_weight = _process_sample_weight(sample_weight, self.target_series) self.output_chunk_length = output_chunk_length self.min_lh, self.max_lh = lh self.lookback = lookback # Checks if not (self.max_lh >= self.min_lh >= 1): raise_log( ValueError( "The lh parameter should be an int tuple (min_lh, max_lh), " "with 1 <= min_lh <= max_lh" ), logger=logger, ) self.nr_samples_per_ts = (self.max_lh - self.min_lh) * self.output_chunk_length self.total_nr_samples = len(self.target_series) * self.nr_samples_per_ts self.use_static_covariates = use_static_covariates def __len__(self): """ Returns the total number of possible (input, target) splits. """ return self.total_nr_samples def __getitem__( self, idx: int ) -> Tuple[ np.ndarray, Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], np.ndarray, ]: # determine the index of the time series. target_idx = idx // self.nr_samples_per_ts target_series = self.target_series[target_idx] target_vals = target_series.random_component_values(copy=False) if len(target_vals) < (self.lookback + self.max_lh) * self.output_chunk_length: raise_log( ValueError( "The dataset contains some input/target series that are shorter than " f"`(lookback + max_lh) * H` ({target_idx}-th series)" ), logger=logger, ) # determine the index lh_idx of the forecasting point (the last point of the input series, before the target) # lh_idx should be in [0, self.nr_samples_per_ts) lh_idx = idx - (target_idx * self.nr_samples_per_ts) # determine the index at the end of the output chunk end_of_output_idx = len(target_series) - ( (self.min_lh - 1) * self.output_chunk_length + lh_idx ) # optionally, load covariates covariate_series = ( self.covariates[target_idx] if self.covariates is not None else None ) main_covariate_type = ( CovariateType.NONE if self.covariates is None else CovariateType.PAST ) # optionally, load sample weight sample_weight_series = ( self.sample_weight[target_idx] if self.sample_weight is not None else None ) shift = self.lookback * self.output_chunk_length input_chunk_length = shift # get all indices for the current sample ( past_start, past_end, future_start, future_end, cov_start, cov_end, sample_weight_start, sample_weight_end, ) = self._memory_indexer( target_idx=target_idx, target_series=target_series, shift=shift, input_chunk_length=input_chunk_length, output_chunk_length=self.output_chunk_length, end_of_output_idx=end_of_output_idx, covariate_series=covariate_series, covariate_type=main_covariate_type, sample_weight_series=sample_weight_series, ) # extract sample target future_target = target_vals[future_start:future_end] past_target = target_vals[past_start:past_end] # extract sample covariates covariate = None if self.covariates is not None: if cov_end > len(covariate_series): raise_log( ValueError( f"The dataset contains past covariates that don't extend far enough into the future. " f"({idx}-th sample)" ), logger=logger, ) covariate = covariate_series.random_component_values(copy=False)[ cov_start:cov_end ] if len(covariate) != len(past_target): raise_log( ValueError( "The dataset contains past covariates whose time axis doesn't allow to obtain the " "input (or output) chunk relative to the target series." ), logger=logger, ) # extract sample weights sample_weight = None if self.sample_weight is not None: if sample_weight_end > len(sample_weight_series): raise_log( ValueError( f"The dataset contains sample weights " f"that don't extend far enough into the future. ({idx}-th sample)" ), logger=logger, ) sample_weight = sample_weight_series.random_component_values(copy=False)[ sample_weight_start:sample_weight_end ] if len(sample_weight) != self.output_chunk_length: raise_log( ValueError( "The dataset contains sample weights whose time axis doesn't allow to obtain " "the input (or output) chunk relative to the target series." ), logger=logger, ) # extract sample static covariates if self.use_static_covariates: static_covariate = target_series.static_covariates_values(copy=False) else: static_covariate = None return past_target, covariate, static_covariate, sample_weight, future_target