Source code for darts.utils.data.tabularization

import warnings
from functools import reduce
from math import inf
from typing import Dict, List, Optional, Sequence, Tuple, Union

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

from itertools import chain

import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import as_strided

from darts.logging import get_logger, raise_if, raise_if_not, raise_log
from darts.timeseries import TimeSeries
from darts.utils.ts_utils import get_single_series, series2seq
from darts.utils.utils import n_steps_between

logger = get_logger(__name__)

ArrayOrArraySequence = Union[np.ndarray, Sequence[np.ndarray]]


[docs]def create_lagged_data( target_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, lags: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, output_chunk_length: int = 1, output_chunk_shift: int = 0, uses_static_covariates: bool = True, last_static_covariates_shape: Optional[Tuple[int, int]] = None, max_samples_per_ts: Optional[int] = None, multi_models: bool = True, check_inputs: bool = True, use_moving_windows: bool = True, is_training: bool = True, concatenate: bool = True, ) -> Tuple[ ArrayOrArraySequence, Union[None, ArrayOrArraySequence], Sequence[pd.Index], Optional[Tuple[int, int]], ]: """ Creates the features array `X` and labels array `y` to train a lagged-variables regression model (e.g. an `sklearn` model) when `is_training = True`; alternatively, creates the features array `X` to produce a series of prediction from an already-trained regression model when `is_training = False`. In both cases, a list of time indices corresponding to each generated observation is also returned. Notes ----- Instead of calling `create_lagged_data` directly, it is instead recommended that: - `create_lagged_training_data` be called if one wishes to create the `X` and `y` arrays to train a regression model. - `create_lagged_prediction_data` be called if one wishes to create the `X` array required to generate a prediction from an already-trained regression model. This is because even though both of these functions are merely wrappers around `create_lagged_data`, their call signatures are more easily interpreted than `create_lagged_data`. For example, `create_lagged_prediction_data` does not accept `output_chunk_length` nor `multi_models` as inputs, since these inputs are not used when constructing prediction data. Similarly, `create_lagged_prediction_data` returns only `X` and `times` as outputs, as opposed to returning `y` as `None` along with `X` and `times`. The `X` array is constructed from the lagged values of up to three separate timeseries: 1. The `target_series`, which contains the values we're trying to predict. A regression model that uses previous values of the target its predicting is referred to as *autoregressive*; please refer to [1]_ for further details about autoregressive timeseries models. 2. The past covariates series, which contains values that are *not* known into the future. Unlike the target series, however, past covariates are *not* to be predicted by the regression model. 3. The future covariates (AKA 'exogenous' covariates) series, which contains values that are known into the future, even beyond the data in `target_series` and `past_covariates`. See [2]_ for a more detailed discussion about target, past, and future covariates. Conversely, `y` is comprised only of the lagged values of `target_series`. The shape of `X` is: `X.shape = (n_observations, n_lagged_features, n_samples)`, where `n_observations` equals either the number of time points shared between all specified series, or `max_samples_per_ts`, whichever is smallest. The shape of `y` is: `y.shape = (n_observations, output_chunk_length, n_samples)`, if `multi_models = True`, otherwise: `y.shape = (n_observations, 1, n_samples)`. Along the `n_lagged_features` axis, `X` has the following structure (for `*_lags=[-2,-1]` and `*_series.n_components = 2`): lagged_target | lagged_past_covariates | lagged_future_covariates where each `lagged_*` has the following structure: lag_-2_comp_1_* | lag_-2_comp_2_* | lag_-1_comp_1_* | lag_-1_comp_2_* Along the `n_lagged_labels` axis, `y` has the following structure (for `output_chunk_length=4` and `target_series.n_components=2`): lag_+0_comp_1_target | lag_+0_comp_2_target | ... | lag_+3_comp_1_target | lag_+3_comp_2_target The `lags` and `lags_past_covariates` must contain only values less than or equal to -1. In other words, one cannot use the value of either of these series at time `t` to predict the value of the target series at the same time `t`; this is because the values of `target_series` and `past_covariates` at time `t` aren't available at prediction time, by definition. Conversely, since the values of `future_covariates` are known into the future, `lags_future_covariates` can contain negative, positive, and/or zero lag values (i.e. we *can* use the values of `future_covariates` at time `t` or beyond to predict the value of `target_series` at time `t`). The exact method used to construct `X` and `y` depends on whether all specified timeseries are of the same frequency or not: - If all specified timeseries are of the same frequency, `strided_moving_window` is used to extract contiguous time blocks from each timeseries; the lagged variables are then extracted from each window. - If all specified timeseries are *not* of the same frequency, then `find_shared_times` is first used to find those times common to all three timeseries, after which the lagged features are extracted by offsetting the time indices of these common times by the requested lags. In cases where it can be validly applied, the 'moving window' method is expected to be faster than the 'intersecting time' method. However, in exceptional cases where only a small number of lags are being extracted, but the difference between the lag values is large (e.g. `lags = [-1, -1000]`), the 'moving window' method is expected to consume significantly more memory, since it extracts all series values between the maximum and minimum lags as 'windows', before actually extracting the specific requested lag values. In order for the lagged features of a series to be added to `X`, *both* that series and the corresponding lags must be specified; if a series is specified without the corresponding lags, that series will be ignored and not added to `X`. `X` and `y` arrays are constructed independently over the samples dimension (i.e. the second axis) of each series. If the provided series are stochastic (i.e. `series.n_components > 1`), then an `X` and `y` array will be constructed for each sample; the arrays corresponding to each sample are concatenated togather along the `2`nd axis of `X` and `y`. In other words, `create_lagged_data` is vectorised over the sample axis of the `target_series`, `past_covariates`, and `future_covariates` inputs. Importantly, if stochastic series are provided, each series must have the same number of samples, otherwise an error will be thrown. Each series input (i.e. `target_series`, `past_covariates`, and `future_covariates`) can be specified either as a single `TimeSeries`, or as a `Sequence` of `TimeSeries`; the specified series must all be of the same type, however (i.e. either all `TimeSeries` or all `Sequence[TimeSeries]`). If `Sequence[TimeSeries]` are specified, then a feature matrix `X` and labels array `y` will be constructed using the corresponding `TimeSeries` in each `Sequence` (i.e. the first `TimeSeries` in each `Sequence` are used to create an `X` and `y`, then the second `TimeSeries` in each `Sequence` are used to create an `X` and `y`, etc.). If `concatenate = True`, these `X`'s and `y`'s will be concatenated along the `0`th axis; otherwise, a list of `X` and `y` array will be returned. Note that `times` is always returned as a `Sequence[pd.Index]`, however, even when `concatenate = True`. Parameters ---------- target_series Optionally, the series for the regression model to predict. Must be specified if `is_training = True`. Can be specified as either a `TimeSeries` or as a `Sequence[TimeSeries]`. past_covariates Optionally, the past covariates series that the regression model will use as inputs. Unlike the `target_series`, `past_covariates` are *not* to be predicted by the regression model. Can be specified as either a `TimeSeries` or as a `Sequence[TimeSeries]`. future_covariates Optionally, the future covariates (i.e. exogenous covariates) series that the regression model will use as inputs. Can be specified as either a `TimeSeries` or as a `Sequence[TimeSeries]`. lags Optionally, the lags of the target series to be used as (autoregressive) features. If not specified, autoregressive features will *not* be added to `X`. Each lag value is assumed to be negative (e.g. `lags = [-3, -1]` will extract `target_series` values which are 3 time steps and 1 time step away from the current value). If the lags are provided as a dictionary, the lags values are specific to each component in the target series. lags_past_covariates Optionally, the lags of `past_covariates` to be used as features. Like `lags`, each lag value is assumed to be less than or equal to -1. If the lags are provided as a dictionary, the lags values are specific to each component in the past covariates series. lags_future_covariates Optionally, the lags of `future_covariates` to be used as features. Unlike `lags` and `lags_past_covariates`, `lags_future_covariates` values can be positive (i.e. use values *after* time `t` to predict target at time `t`), zero (i.e. use values *at* time `t` to predict target at time `t`), and/or negative (i.e. use values *before* time `t` to predict target at time `t`). If `output_chunk_shift > 0`, the lags are relative to the first time step of the shifted output chunk. If the lags are provided as a dictionary, the lags values are specific to each component in the future covariates series. output_chunk_length Optionally, the number of time steps ahead into the future the regression model is to predict. Must best specified if `is_training = True`. output_chunk_shift Optionally, the number of time steps to shift the output chunk ahead into the future. uses_static_covariates Whether the model uses/expects static covariates. If `True`, it enforces that static covariates must have identical shapes across all target series. last_static_covariates_shape Optionally, the last observed shape of the static covariates. This is ``None`` before fitting, or when `uses_static_covariates` is ``False``. max_samples_per_ts Optionally, the maximum number of samples to be drawn for training/validation; only the most recent samples are kept. In theory, specifying a smaller `max_samples_per_ts` should reduce computation time, especially in cases where many observations could be generated. multi_models Optionally, specifies whether the regression model predicts multiple time steps into the future. If `True`, then the regression model is assumed to predict all time steps from time `t` to `t+output_chunk_length`. If `False`, then the regression model is assumed to predict *only* the time step at `t+output_chunk_length`. This input is ignored if `is_training = False`. check_inputs Optionally, specifies that the `lags_*` and `series_*` inputs should be checked for validity. Should be set to `False` if inputs have already been checked for validity (e.g. inside the `__init__` of a class), otherwise should be set to `True`. use_moving_windows Optionally, specifies that the 'moving window' method should be used to construct `X` and `y` if all provided series are of the same frequency. If `use_moving_windows = False`, the 'time intersection' method will always be used, even when all provided series are of the same frequency. In general, setting to `True` results in faster tabularization at the potential cost of higher memory usage. See Notes for further details. is_training Optionally, specifies whether the constructed lagged data are to be used for training a regression model (i.e. `is_training = True`), or for generating predictions from an already-trained regression model (i.e. `is_training = False`). If `is_training = True`, `target_series` and `output_chunk_length` must be specified, the `multi_models` input is utilised, and a label array `y` is returned. Conversely, if `is_training = False`, then `target_series` and `output_chunk_length` do not need to be specified, the `multi_models` input is ignored, and the returned `y` value is `None`. concatenate Optionally, specifies that `X` and `y` should both be returned as single `np.ndarray`s, instead of as a `Sequence[np.ndarray]`. If each series input is specified as a `Sequence[TimeSeries]` and `concatenate = False`, `X` and `y` will be lists whose `i`th element corresponds to the feature matrix or label array formed by the `i`th `TimeSeries` in each `Sequence[TimeSeries]` input. Conversely, if `concatenate = True` when `Sequence[TimeSeries]` are provided, then `X` and `y` will be arrays created by concatenating all feature/label arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. Returns ------- X The constructed features array(s), with shape `(n_observations, n_lagged_features, n_samples)`. If the series inputs were specified as `Sequence[TimeSeries]` and `concatenate = False`, then `X` is returned as a `Sequence[np.array]`; otherwise, `X` is returned as a single `np.array`. y The constructed labels array. If `multi_models = True`, then `y` is a `(n_observations, output_chunk_length, n_samples)`-shaped array; conversely, if `multi_models = False`, then `y` is a `(n_observations, 1, n_samples)`-shaped array. If the series inputs were specified as `Sequence[TimeSeries]` and `concatenate = False`, then `y` is returned as a `Sequence[np.array]`; otherwise, `y` is returned as a single `np.array`. times The `time_index` of each observation in `X` and `y`, returned as a `Sequence` of `pd.Index`es. If the series inputs were specified as `Sequence[TimeSeries]`, then the `i`th list element gives the times of those observations formed using the `i`th `TimeSeries` object in each `Sequence`. Otherwise, if the series inputs were specified as `TimeSeries`, the only element is the times of those observations formed from the lone `TimeSeries` inputs. last_static_covariates_shape The last observed shape of the static covariates. This is ``None`` when `uses_static_covariates` is ``False``. Raises ------ ValueError If the specified time series do not share any times for which features (and labels if `is_training = True`) can be constructed. ValueError If no lags are specified, or if any of the specified lag values are non-negative. ValueError If any of the series are too short to create features and/or labels for the requested lags and `output_chunk_length` values. ValueError If `target_series` and/or `output_chunk_length` are *not* specified when `is_training = True`. ValueError If the provided series do not share the same type of `time_index` (e.g. `target_series` uses a pd.RangeIndex, but `future_covariates` uses a `pd.DatetimeIndex`). References ---------- .. [1] https://otexts.com/fpp2/AR.html#AR .. [2] https://unit8.com/resources/time-series-forecasting-using-past-and-future-external-data-with-darts/ See Also -------- tabularization.create_lagged_component_names : return the lagged features names as a list of strings. """ raise_if( is_training and (target_series is None), "Must specify `target_series` if `is_training = True`.", ) # ensure list of TimeSeries format target_series = series2seq(target_series) past_covariates = series2seq(past_covariates) future_covariates = series2seq(future_covariates) seq_ts_lens = [ len(seq_ts) for seq_ts in (target_series, past_covariates, future_covariates) if seq_ts is not None ] seq_ts_lens = set(seq_ts_lens) if len(seq_ts_lens) > 1: raise_log( ValueError( "Must specify the same number of `TimeSeries` for each series input." ), logger, ) lags_passed_as_dict = any( isinstance(lags_, dict) for lags_ in [lags, lags_past_covariates, lags_future_covariates] ) if (not use_moving_windows) and lags_passed_as_dict: raise_log( ValueError( "`use_moving_windows=False` is not supported when any of the lags is provided as a dictionary. " f"Received: {[lags, lags_past_covariates, lags_future_covariates]}." ), logger, ) if max_samples_per_ts is None: max_samples_per_ts = inf # lags are identical for multiple series: pre-compute lagged features and reordered lagged features lags_extract, lags_order = _get_lagged_indices( lags, lags_past_covariates, lags_future_covariates, ) X, y, times = [], [], [] for i in range(max(seq_ts_lens)): target_i = target_series[i] if target_series else None past_i = past_covariates[i] if past_covariates else None future_i = future_covariates[i] if future_covariates else None series_equal_freq = _all_equal_freq(target_i, past_i, future_i) # component-wise lags extraction is not support with times intersection at the moment if use_moving_windows and lags_passed_as_dict and (not series_equal_freq): raise_log( ValueError( f"Cannot create tabularized data for the {i}th series because target and covariates don't have " "the same frequency and some of the lags are provided as a dictionary. Either resample the " "series or change the lags definition." ), logger, ) if use_moving_windows and series_equal_freq: X_i, y_i, times_i = _create_lagged_data_by_moving_window( target_i, output_chunk_length, output_chunk_shift, past_i, future_i, lags, lags_past_covariates, lags_future_covariates, lags_extract, lags_order, max_samples_per_ts, multi_models, check_inputs, is_training, ) else: X_i, y_i, times_i = _create_lagged_data_by_intersecting_times( target_i, output_chunk_length, output_chunk_shift, past_i, future_i, lags, lags_past_covariates, lags_future_covariates, max_samples_per_ts, multi_models, check_inputs, is_training, ) X_i, last_static_covariates_shape = add_static_covariates_to_lagged_data( features=X_i, target_series=target_i, uses_static_covariates=uses_static_covariates, last_shape=last_static_covariates_shape, ) X.append(X_i) y.append(y_i) times.append(times_i) if concatenate: X = np.concatenate(X, axis=0) if not is_training: y = None elif concatenate: y = np.concatenate(y, axis=0) return X, y, times, last_static_covariates_shape
[docs]def create_lagged_training_data( target_series: Union[TimeSeries, Sequence[TimeSeries]], output_chunk_length: int, output_chunk_shift: int, past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, lags: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, uses_static_covariates: bool = True, last_static_covariates_shape: Optional[Tuple[int, int]] = None, max_samples_per_ts: Optional[int] = None, multi_models: bool = True, check_inputs: bool = True, use_moving_windows: bool = True, concatenate: bool = True, ) -> Tuple[ ArrayOrArraySequence, Union[None, ArrayOrArraySequence], Sequence[pd.Index], Optional[Tuple[int, int]], ]: """ Creates the features array `X` and labels array `y` to train a lagged-variables regression model (e.g. an `sklearn` model); the time index values of each observation is also returned. Notes ----- This function is simply a wrapper around `create_lagged_data`; for further details on the structure of `X`, please refer to `help(create_lagged_data)`. Parameters ---------- target_series The series for the regression model to predict. output_chunk_length The number of time steps ahead into the future the regression model is to predict. output_chunk_shift Optionally, the number of time steps to shift the output chunk ahead into the future. past_covariates Optionally, the past covariates series that the regression model will use as inputs. Unlike the `target_series`, `past_covariates` are *not* to be predicted by the regression model. future_covariates Optionally, the future covariates (i.e. exogenous covariates) series that the regression model will use as inputs. lags Optionally, the lags of the target series to be used as (autoregressive) features. If not specified, autoregressive features will *not* be added to `X`. Each lag value is assumed to be negative (e.g. `lags = [-3, -1]` will extract `target_series` values which are 3 time steps and 1 time step away from the current value). If the lags are provided as a dictionary, the lags values are specific to each component in the target series. lags_past_covariates Optionally, the lags of `past_covariates` to be used as features. Like `lags`, each lag value is assumed to be less than or equal to -1. If the lags are provided as a dictionary, the lags values are specific to each component in the past covariates series. lags_future_covariates Optionally, the lags of `future_covariates` to be used as features. Unlike `lags` and `lags_past_covariates`, `lags_future_covariates` values can be positive (i.e. use values *after* time `t` to predict target at time `t`), zero (i.e. use values *at* time `t` to predict target at time `t`), and/or negative (i.e. use values *before* time `t` to predict target at time `t`). If the lags are provided as a dictionary, the lags values are specific to each component in the future covariates series. uses_static_covariates Whether the model uses/expects static covariates. If `True`, it enforces that static covariates must have identical shapes across all target series. last_static_covariates_shape Optionally, the last observed shape of the static covariates. This is ``None`` before fitting, or when `uses_static_covariates` is ``False``. max_samples_per_ts Optionally, the maximum number of samples to be drawn for training/validation; only the most recent samples are kept. In theory, specifying a smaller `max_samples_per_ts` should reduce computation time, especially in cases where many observations could be generated. multi_models Optionally, specifies whether the regression model predicts multiple time steps into the future. If `True`, then the regression model is assumed to predict all time steps from time `t` to `t+output_chunk_length`. If `False`, then the regression model is assumed to predict *only* the time step at `t+output_chunk_length`. check_inputs Optionally, specifies that the `lags_*` and `series_*` inputs should be checked for validity. Should be set to `False` if inputs have already been checked for validity (e.g. inside the `__init__` of a class), otherwise should be set to `True`. use_moving_windows Optionally, specifies that the 'moving window' method should be used to construct `X` and `y` if all provided series are of the same frequency. If `use_moving_windows = False`, the 'time intersection' method will always be used, even when all provided series are of the same frequency. In general, setting to `True` results in faster tabularization at the potential cost of higher memory usage. See Notes for further details. concatenate Optionally, specifies that `X` and `y` should both be returned as single `np.ndarray`s, instead of as a `Sequence[np.ndarray]`. If each series input is specified as a `Sequence[TimeSeries]` and `concatenate = False`, `X` and `y` will be lists whose `i`th element corresponds to the feature matrix or label array formed by the `i`th `TimeSeries` in each `Sequence[TimeSeries]` input. Conversely, if `concatenate = True` when `Sequence[TimeSeries]` are provided, then `X` and `y` will be arrays created by concatenating all feature/label arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. Returns ------- X The constructed features array(s), with shape `(n_observations, n_lagged_features, n_samples)`. If the series inputs were specified as `Sequence[TimeSeries]` and `concatenate = False`, then `X` is returned as a `Sequence[np.array]`; otherwise, `X` is returned as a single `np.array`. y The constructed labels array. If `multi_models = True`, then `y` is a `(n_observations, output_chunk_length, n_samples)`-shaped array; conversely, if `multi_models = False`, then `y` is a `(n_observations, 1, n_samples)`-shaped array. If the series inputs were specified as `Sequence[TimeSeries]` and `concatenate = False`, then `y` is returned as a `Sequence[np.array]`; otherwise, `y` is returned as a single `np.array`. times The `time_index` of each observation in `X` and `y`, returned as a `Sequence` of `pd.Index`es. If the series inputs were specified as `Sequence[TimeSeries]`, then the `i`th list element gives the times of those observations formed using the `i`th `TimeSeries` object in each `Sequence`. Otherwise, if the series inputs were specified as `TimeSeries`, the only element is the times of those observations formed from the lone `TimeSeries` inputs. Raises ------ ValueError If the specified time series do not share any times for which features and labels can be constructed. ValueError If no lags are specified, or if any of the specified lag values are non-negative. ValueError If any of the series are too short to create features and labels for the requested lags and `output_chunk_length` values. ValueError If the provided series do not share the same type of `time_index` (e.g. `target_series` uses a pd.RangeIndex, but `future_covariates` uses a `pd.DatetimeIndex`). """ return create_lagged_data( target_series=target_series, past_covariates=past_covariates, future_covariates=future_covariates, lags=lags, lags_past_covariates=lags_past_covariates, lags_future_covariates=lags_future_covariates, output_chunk_length=output_chunk_length, output_chunk_shift=output_chunk_shift, uses_static_covariates=uses_static_covariates, last_static_covariates_shape=last_static_covariates_shape, max_samples_per_ts=max_samples_per_ts, multi_models=multi_models, check_inputs=check_inputs, use_moving_windows=use_moving_windows, is_training=True, concatenate=concatenate, )
[docs]def create_lagged_prediction_data( target_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, lags: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, uses_static_covariates: bool = True, last_static_covariates_shape: Optional[Tuple[int, int]] = None, max_samples_per_ts: Optional[int] = None, check_inputs: bool = True, use_moving_windows: bool = True, concatenate: bool = True, ) -> Tuple[ArrayOrArraySequence, Sequence[pd.Index]]: """ Creates the features array `X` to produce a series of prediction from an already-trained regression model; the time index values of each observation is also returned. Notes ----- This function is simply a wrapper around `create_lagged_data`; for further details on the structure of `X`, please refer to `help(create_lagged_data)`. Parameters ---------- target_series Optionally, the series for the regression model to predict. past_covariates Optionally, the past covariates series that the regression model will use as inputs. Unlike the `target_series`, `past_covariates` are *not* to be predicted by the regression model. future_covariates Optionally, the future covariates (i.e. exogenous covariates) series that the regression model will use as inputs. lags Optionally, the lags of the target series to be used as (autoregressive) features. If not specified, autoregressive features will *not* be added to `X`. Each lag value is assumed to be negative (e.g. `lags = [-3, -1]` will extract `target_series` values which are 3 time steps and 1 time step away from the current value). If the lags are provided as a dictionary, the lags values are specific to each component in the target series. lags_past_covariates Optionally, the lags of `past_covariates` to be used as features. Like `lags`, each lag value is assumed to be less than or equal to -1. If the lags are provided as a dictionary, the lags values are specific to each component in the past covariates series. lags_future_covariates Optionally, the lags of `future_covariates` to be used as features. Unlike `lags` and `lags_past_covariates`, `lags_future_covariates` values can be positive (i.e. use values *after* time `t` to predict target at time `t`), zero (i.e. use values *at* time `t` to predict target at time `t`), and/or negative (i.e. use values *before* time `t` to predict target at time `t`). If the lags are provided as a dictionary, the lags values are specific to each component in the future covariates series. uses_static_covariates Whether the model uses/expects static covariates. If `True`, it enforces that static covariates must have identical shapes across all target series. last_static_covariates_shape Optionally, the last observed shape of the static covariates. This is ``None`` before fitting, or when `uses_static_covariates` is ``False``. max_samples_per_ts Optionally, the maximum number of samples to be drawn for training/validation; only the most recent samples are kept. In theory, specifying a smaller `max_samples_per_ts` should reduce computation time, especially in cases where many observations could be generated. check_inputs Optionally, specifies that the `lags_*` and `series_*` inputs should be checked for validity. Should be set to `False` if inputs have already been checked for validity (e.g. inside the `__init__` of a class), otherwise should be set to `True`. use_moving_windows Optionally, specifies that the 'moving window' method should be used to construct `X` and `y` if all provided series are of the same frequency. If `use_moving_windows = False`, the 'time intersection' method will always be used, even when all provided series are of the same frequency. In general, setting to `True` results in faster tabularization at the potential cost of higher memory usage. See Notes for further details. concatenate Optionally, specifies that `X` should be returned as a single `np.ndarray`, instead of as a `Sequence[np.ndarray]`. If each series input is specified as a `Sequence[TimeSeries]` and `concatenate = False`, `X` will be a list whose `i`th element corresponds to the feature matrix or label array formed by the `i`th `TimeSeries` in each `Sequence[TimeSeries]` input. Conversely, if `concatenate = True` when `Sequence[TimeSeries]` are provided, then `X` will be an array created by concatenating all feature arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. Returns ------- X The constructed features array(s), with shape `(n_observations, n_lagged_features, n_samples)`. If the series inputs were specified as `Sequence[TimeSeries]` and `concatenate = False`, then `X` is returned as a `Sequence[np.array]`; otherwise, `X` is returned as a single `np.array`. times The `time_index` of each observation in `X` and `y`, returned as a `Sequence` of `pd.Index`es. If the series inputs were specified as `Sequence[TimeSeries]`, then the `i`th list element gives the times of those observations formed using the `i`th `TimeSeries` object in each `Sequence`. Otherwise, if the series inputs were specified as `TimeSeries`, the only element is the times of those observations formed from the lone `TimeSeries` inputs. Raises ------ ValueError If the specified time series do not share any times for which features can be constructed. ValueError If no lags are specified, or if any of the specified lag values are non-negative. ValueError If any of the series are too short to create features for the requested lag values. ValueError If the provided series do not share the same type of `time_index` (e.g. `target_series` uses a pd.RangeIndex, but `future_covariates` uses a `pd.DatetimeIndex`). """ X, _, times, _ = create_lagged_data( target_series=target_series, past_covariates=past_covariates, future_covariates=future_covariates, lags=lags, lags_past_covariates=lags_past_covariates, lags_future_covariates=lags_future_covariates, uses_static_covariates=uses_static_covariates, last_static_covariates_shape=last_static_covariates_shape, max_samples_per_ts=max_samples_per_ts, check_inputs=check_inputs, use_moving_windows=use_moving_windows, is_training=False, concatenate=concatenate, ) return X, times
[docs]def add_static_covariates_to_lagged_data( features: Union[np.ndarray, Sequence[np.ndarray]], target_series: Union[TimeSeries, Sequence[TimeSeries]], uses_static_covariates: bool = True, last_shape: Optional[Tuple[int, int]] = None, ) -> Union[np.ndarray, Sequence[np.ndarray]]: """ Add static covariates to the features' table for RegressionModels. If `uses_static_covariates=True`, all target series used in `fit()` and `predict()` must have static covariates with identical dimensionality. Otherwise, will not consider static covariates. The static covariates are added to the right of the lagged features following the convention: with a 2 component series, and 2 static covariates per component -> scov_1_comp_1 | scov_1_comp_2 | scov_2_comp_1 | scov_2_comp_2 Parameters ---------- features The features' numpy array(s) to which the static covariates will be added. Can either be a lone feature matrix or a `Sequence` of feature matrices; in the latter case, static covariates will be appended to each feature matrix in this `Sequence`. target_series The target series from which to read the static covariates. uses_static_covariates Whether the model uses/expects static covariates. If `True`, it enforces that static covariates must have identical shapes across all of target series. last_shape Optionally, the last observed shape of the static covariates. This is ``None`` before fitting, or when `uses_static_covariates` is ``False``. Returns ------- (features, last_shape) The features' array(s) with appended static covariates columns. If the `features` input was passed as a `Sequence` of `np.array`s, then a `Sequence` is also returned; if `features` was passed as an `np.array`, a `np.array` is returned. `last_shape` is the shape of the static covariates. """ # uses_static_covariates=True enforces that all series must have static covs of same dimensionality if not uses_static_covariates: return features, last_shape input_not_list = not isinstance(features, Sequence) if input_not_list: features = [features] target_series = series2seq(target_series) # go through series, check static covariates, and stack them to the right of the lagged features # try to abort early in case there is a mismatch in static covariates for idx, ts in enumerate(target_series): if not ts.has_static_covariates: raise_log( ValueError( "Static covariates mismatch across the sequence of target series. Some of the series " "contain static covariates and others do not." ), logger, ) else: if last_shape is None: last_shape = ts.static_covariates.shape if ts.static_covariates.shape != last_shape: raise_log( ValueError( "Static covariates dimension mismatch across the sequence of target series. The static " "covariates must have the same number of columns and rows across all target series." ), logger, ) # flatten static covariates along columns -> results in [scov0_comp0, scov0_comp1, scov1_comp0, ...] static_covs = ts.static_covariates.values.flatten(order="F") # we stack the static covariates to the right of lagged features # the broadcasting repeats the static covariates along axis=0 to match the number of feature rows shape_out = ( (len(features[idx]), len(static_covs)) if len(features[idx].shape) == 2 else (len(features[idx]), len(static_covs), 1) ) features[idx] = np.hstack( [ features[idx], np.broadcast_to(static_covs, shape_out[:2]).reshape(shape_out), ] ) if input_not_list: features = features[0] return features, last_shape
[docs]def create_lagged_component_names( target_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, lags: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, output_chunk_length: int = 1, concatenate: bool = True, use_static_covariates: bool = False, ) -> Tuple[List[List[str]], List[List[str]]]: """ Helper function called to retrieve the name of the features and labels arrays created with `create_lagged_data()`. The order of the features is the following: Along the `n_lagged_features` axis, `X` has the following structure: lagged_target | lagged_past_covariates | lagged_future_covariates | static covariates For `*_lags=[-2,-1]` and `*_series.n_components = 2` (lags shared across all the components), each `lagged_*` has the following structure (grouped by lags): comp0_*_lag-2 | comp1_*_lag-2 | comp0_*_lag_-1 | comp1_*_lag-1 For `*_lags={'comp0':[-3, -1], 'comp1':[-5, -3]}` and `*_series.n_components = 2` (component- specific lags), each `lagged_*` has the following structure (sorted by lags, then by components): comp1_*_lag-5 | comp0_*_lag-3 | comp1_*_lag_-3 | comp0_*_lag-1 and for static covariates (2 static covariates acting on 2 target components): cov0_*_target_comp0 | cov0_*_target_comp1 | cov1_*_target_comp0 | cov1_*_target_comp1 Along the `n_lagged_labels` axis, `y` has the following structure (for `output_chunk_length=4` and `target_series.n_components=2`): comp0_target_lag0 | comp1_target_lag0 | ... | comp0_target_lag3 | comp1_target_lag3 Note : will only use the component names of the first series from `target_series`, `past_covariates`, `future_covariates`, and static_covariates. The naming convention for target, past and future covariates lags is: ``"{name}_{type}_lag{i}"``, where: - ``{name}`` the component name of the (first) series - ``{type}`` is the feature type, one of "target", "pastcov", and "futcov" - ``{i}`` is the lag value The naming convention for static covariates is: ``"{name}_statcov_target_{comp}"``, where: - ``{name}`` the static covariate name of the (first) series - ``{comp}`` the target component name of the (first) that the static covariate act on. If the static covariate acts globally on a multivariate target series, will show "global". The naming convention for labels is: ``"{name}_target_hrz{i}"``, where: - ``{name}`` the component name of the (first) series - ``{i}`` is the step in the forecast horizon Returns ------- features_cols_name The names of the lagged features in the `X` array generated by `create_lagged_data()` as a `List[str]`. If `concatenate=True`, also contains the columns names for the `y` array (on the right). labels_cols_name The names of the lagged features in the `y` array generated by `create_lagged_data()` as a `List[str]`. See Also -------- tabularization.create_lagged_data : generate the lagged features and labels as (list of) Arrays. """ target_series = series2seq(target_series) past_covariates = series2seq(past_covariates) future_covariates = series2seq(future_covariates) lagged_feature_names = [] label_feature_names = [] for variate, variate_lags, variate_type in zip( [target_series, past_covariates, future_covariates], [lags, lags_past_covariates, lags_future_covariates], ["target", "pastcov", "futcov"], ): if variate is None or variate_lags is None: continue components = get_single_series(variate).components.tolist() if isinstance(variate_lags, dict): if "default_lags" in variate_lags: raise_log( ValueError( "All the lags must be explicitly defined, 'default_lags' is not allowed in the " "lags dictionary." ), logger, ) # combine all the lags and sort them in ascending order across all the components comp_lags_reordered = np.concatenate( [ np.array(variate_lags[comp_name], dtype=int) for comp_name in components ] ).argsort() tmp_lagged_feats_names = [] for name in components: tmp_lagged_feats_names += [ f"{name}_{variate_type}_lag{lag}" for lag in variate_lags[name] ] # adding feats names reordered across components lagged_feature_names += [ tmp_lagged_feats_names[idx] for idx in comp_lags_reordered ] else: lagged_feature_names += [ f"{name}_{variate_type}_lag{lag}" for lag in variate_lags for name in components ] if variate_type == "target" and lags: label_feature_names = [ f"{name}_target_hrz{lag}" for lag in range(output_chunk_length) for name in components ] # static covariates if use_static_covariates: static_covs = get_single_series(target_series).static_covariates # static covariate names names = static_covs.columns.tolist() # target components that the static covariates reference to comps = static_covs.index.tolist() lagged_feature_names += [ f"{name}_statcov_target_{comp}" for name in names for comp in comps ] if concatenate: lagged_feature_names += label_feature_names return lagged_feature_names, label_feature_names
def _get_lagged_indices( lags, lags_past_covariates, lags_future_covariates, ): """Computes and returns: - the lagged feature indices for extraction from windows - the reordered indices to apply after the window extraction (in case of component specific lags) Assumes that all input series share identical component order. """ lags_extract = [] lags_order = [] for lags_i in [lags, lags_past_covariates, lags_future_covariates]: if lags_i is None: lags_extract.append(None) lags_order.append(None) continue # Within each window, the `-1` indexed value (i.e. the value at the very end of # the window) corresponds to time `t - min_lag_i`. The negative index of the time # `t + lag_i` within this window is, therefore, `-1 + lag_i + min_lag_i`: if isinstance(lags_i, list): lags_extract_i = np.array(lags_i, dtype=int) # Feats are already grouped by lags and ordered lags_order_i = slice(None) else: # Assume keys are in the same order as the series components # Lags are grouped by component, extracted from the same window lags_extract_i = [np.array(c_lags, dtype=int) for c_lags in lags_i.values()] # Sort the lags across the components in ascending order lags_order_i = np.concatenate(lags_extract_i).argsort() lags_extract.append(lags_extract_i) lags_order.append(lags_order_i) return lags_extract, lags_order def _create_lagged_data_by_moving_window( target_series: Optional[TimeSeries], output_chunk_length: int, output_chunk_shift: int, past_covariates: Optional[TimeSeries], future_covariates: Optional[TimeSeries], lags: Optional[Union[Sequence[int], Dict[str, List[int]]]], lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]], lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]], lags_extract: List[Optional[np.ndarray]], lags_order: List[Optional[np.ndarray]], max_samples_per_ts: Optional[int], multi_models: bool, check_inputs: bool, is_training: bool, ) -> Tuple[np.ndarray, np.ndarray, pd.Index]: """ Helper function called by `create_lagged_data` that computes `X`, `y`, and `times` by extracting 'moving windows' from each series using the `strided_moving_window` function. More specifically, to extract the features of a particular series for an arbitrary time `t`, a 'window' between times `t - max_lag` and `t - min_lag` is extracted, where `max_lag` and `min_lag` are the largest and smallest magnitude lags requested for that particular series. After extracting this window, the requested lag values between these two minimum and maximum lag values can be extracted. Similarly, the labels for time `t` are formed simply by extracting a window between times `t` and `t + output_chunk_length - 1` from the target series. In both cases, the extracted windows can then be reshaped into the correct shape. This approach can only be used if we *can* assume that the specified series are all of the same frequency. Assumes that all the lags are sorted in ascending order. """ feature_times, min_lags, max_lags = _get_feature_times( target_series, past_covariates, future_covariates, lags, lags_past_covariates, lags_future_covariates, output_chunk_length, output_chunk_shift, is_training=is_training, return_min_and_max_lags=True, check_inputs=check_inputs, ) if check_inputs: series_and_lags_not_specified = [max_lag is None for max_lag in max_lags] raise_if( all(series_and_lags_not_specified), "Must specify at least one series-lags pair.", ) time_bounds = get_shared_times_bounds(*feature_times) raise_if( time_bounds is None, "Specified series do not share any common times for which features can be created.", ) freq = _get_freqs(target_series, past_covariates, future_covariates)[0] if isinstance(time_bounds[0], int): # `stop` is exclusive, so need `+ freq` to include end-point: times = pd.RangeIndex( start=time_bounds[0], stop=time_bounds[1] + freq, step=freq ) else: times = pd.date_range(start=time_bounds[0], end=time_bounds[1], freq=freq) num_samples = len(times) if num_samples > max_samples_per_ts: times = times[-max_samples_per_ts:] num_samples = max_samples_per_ts # Time index of 'earliest' constructed observation: start_time = times[0] # Construct features array X: X = [] start_time_idx = None target_start_time_idx = None for i, (series_i, lags_extract_i, lags_order_i, min_lag_i, max_lag_i) in enumerate( zip( [target_series, past_covariates, future_covariates], lags_extract, lags_order, min_lags, max_lags, ) ): series_and_lags_specified = min_lag_i is not None is_target_series = is_training and (i == 0) if is_target_series or series_and_lags_specified: time_index_i = series_i.time_index if time_index_i[0] == start_time: start_time_idx = 0 # If lags are sufficiently large, `series_i` may not contain all # feature times. For example, if `lags_past_covariates = [-50]`, # then we can construct features for time `51` using the value # of `past_covariates` at time `1`, but `past_covariates` may # only go up to time `30`. This does *not* occur when considering # the target series, however, since this series must have values # for all feature times - these values will become labels. # If `start_time` not included in `time_index_i`, can 'manually' calculate # what its index *would* be if `time_index_i` were extended to include that time: elif not is_target_series and (time_index_i[-1] < start_time): start_time_idx = ( len(time_index_i) - 1 + n_steps_between( end=start_time, start=time_index_i[-1], freq=series_i.freq ) ) # future covariates can start after `start_time` if all lags are > 0 elif not is_target_series and (time_index_i[0] > start_time): start_time_idx = -n_steps_between( end=time_index_i[0], start=start_time, freq=series_i.freq ) # If `start_time` *is* included in `time_index_i`, need to binary search `time_index_i` # for its position: else: start_time_idx = np.searchsorted(time_index_i, start_time) if series_and_lags_specified: # Windows taken between times `t - max_lag_i` and `t - min_lag_i` window_len = max_lag_i - min_lag_i + 1 first_window_start_idx = start_time_idx - max_lag_i first_window_end_idx = first_window_start_idx + window_len # Other windows are formed by sequentially shifting first window forward # by 1 index position each time; to create `(num_samples - 1)` more windows # in addition to the first window, need to take `(num_samples - 1)` values # after `first_window_end_idx`: vals = series_i.all_values(copy=False)[ first_window_start_idx : first_window_end_idx + num_samples - 1, :, : ] windows = strided_moving_window( x=vals, window_len=window_len, stride=1, axis=0, check_inputs=False ) # Within each window, the `-1` indexed value (i.e. the value at the very end of # the window) corresponds to time `t - min_lag_i`. The negative index of the time # `t + lag_i` within this window is, therefore, `-1 + lag_i + min_lag_i`: # extract lagged values lagged_vals = _extract_lagged_vals_from_windows( windows, lags_extract_i, lags_shift=min_lag_i - 1 ) # extract and append the reordered lagged values X.append(lagged_vals[:, lags_order_i]) # Cache `start_time_idx` for label creation: if is_target_series: target_start_time_idx = start_time_idx X = np.concatenate(X, axis=1) # Construct labels array `y`: if is_training: # All values between times `t` and `t + output_chunk_length` used as labels: # Window taken between times `t` and `t + output_chunk_length - 1`: first_window_start_idx = target_start_time_idx + output_chunk_shift # Add `+ 1` since end index is exclusive in Python: first_window_end_idx = ( target_start_time_idx + output_chunk_length + output_chunk_shift ) # To create `(num_samples - 1)` other windows in addition to first window, # must take `(num_samples - 1)` values ahead of `first_window_end_idx` vals = target_series.all_values(copy=False)[ first_window_start_idx : first_window_end_idx + num_samples - 1, :, :, ] windows = strided_moving_window( x=vals, window_len=output_chunk_length, stride=1, axis=0, check_inputs=False, ) lags_to_extract = None if multi_models else -np.ones((1,), dtype=int) y = _extract_lagged_vals_from_windows(windows, lags_to_extract) # Only values at times `t + output_chunk_length - 1` used as labels: else: y = None return X, y, times def _extract_lagged_vals_from_windows( windows: np.ndarray, lags_to_extract: Optional[Union[np.ndarray, List[np.ndarray]]] = None, lags_shift: int = 0, ) -> np.ndarray: """ Helper function called by `_create_lagged_data_by_moving_window` that reshapes the `windows` formed by `strided_moving_window` from the shape `(num_windows, num_components, num_series, window_len)` to the shape `(num_windows, num_components * window_len, num_series)`. This reshaping is done such that the order of elements along axis 1 matches the pattern described in the docstring of `create_lagged_data`. If `lags_to_extract` is not specified, all values within each window is extracted. If `lags_to_extract` is specified as an np.ndarray, then only those values within each window that are indexed by `lags_to_extract` will be returned. In such cases, the shape of the returned lagged values is `(num_windows, num_components * lags_to_extract.size, num_series)`. For example, if `lags_to_extract = [-2]`, only the second-to-last values within each window will be extracted. If `lags_to_extract` is specified as a list of np.ndarray, the values will be extracted using the lags provided for each component. In such cases, the shape of the returned lagged values is `(num_windows, sum([comp_lags.size for comp_lags in lags_to_extract]), num_series)`. For example, if `lags_to_extract = [[-2, -1], [-1]]`, the second-to-last and last values of the first component and the last values of the second component within each window will be extracted. """ # windows.shape = (num_windows, num_components, num_samples, window_len): if isinstance(lags_to_extract, list): # iterate over the components-specific lags comp_windows = [ windows[:, i, :, comp_lags_to_extract + lags_shift] for i, comp_lags_to_extract in enumerate(lags_to_extract) ] # windows.shape = (sum(lags_len) across components, num_windows, num_samples): windows = np.concatenate(comp_windows, axis=0) lagged_vals = np.moveaxis(windows, (1, 0, 2), (0, 1, 2)) else: if lags_to_extract is not None: windows = windows[:, :, :, lags_to_extract + lags_shift] # windows.shape = (num_windows, window_len, num_components, num_samples): windows = np.moveaxis(windows, (0, 3, 1, 2), (0, 1, 2, 3)) # lagged_vals.shape = (num_windows, num_components*window_len, num_samples): lagged_vals = windows.reshape((windows.shape[0], -1, windows.shape[-1])) return lagged_vals def _create_lagged_data_by_intersecting_times( target_series: TimeSeries, output_chunk_length: int, output_chunk_shift: int, past_covariates: Optional[TimeSeries], future_covariates: Optional[TimeSeries], lags: Optional[Sequence[int]], lags_past_covariates: Optional[Sequence[int]], lags_future_covariates: Optional[Sequence[int]], max_samples_per_ts: Optional[int], multi_models: bool, check_inputs: bool, is_training: bool, ) -> Tuple[np.ndarray, np.ndarray, Union[pd.RangeIndex, pd.DatetimeIndex]]: """ Helper function called by `_create_lagged_data` that computes `X`, `y`, and `times` by first finding the time points in each series that *could* be used to create features/labels, and then finding which of these 'available' times is shared by all specified series. The lagged values are then extracted by finding the index of each of these 'shared times' in each series, and then offsetting this index by the requested lag value (if constructing `X`) or the requested `output_chunk_length` (if constructing `y`). This approach is used if we *cannot* assume that the specified series are of the same frequency. """ feature_times, min_lags, _ = _get_feature_times( target_series, past_covariates, future_covariates, lags, lags_past_covariates, lags_future_covariates, output_chunk_length, output_chunk_shift, is_training=is_training, return_min_and_max_lags=True, check_inputs=check_inputs, ) if check_inputs: series_and_lags_not_specified = [min_lag is None for min_lag in min_lags] raise_if( all(series_and_lags_not_specified), "Must specify at least one series-lags pair.", ) shared_times = get_shared_times(*feature_times, sort=True) raise_if( shared_times is None, "Specified series do not share any common times for which features can be created.", ) if len(shared_times) > max_samples_per_ts: shared_times = shared_times[-max_samples_per_ts:] X = [] shared_time_idx = None label_shared_time_idx = None for i, (series_i, lags_i, min_lag_i) in enumerate( zip( [target_series, past_covariates, future_covariates], [lags, lags_past_covariates, lags_future_covariates], min_lags, ) ): series_and_lags_specified = min_lag_i is not None is_target_series = is_training and (i == 0) if series_and_lags_specified or is_target_series: time_index_i = series_i.time_index add_to_start = (not is_target_series) and ( time_index_i[0] > shared_times[0] ) add_to_end = (not is_target_series) and ( time_index_i[-1] < shared_times[-1] ) if add_to_start or add_to_end: new_start = shared_times[0] if add_to_start else None new_end = shared_times[-1] if add_to_end else None num_prepended = ( (time_index_i[0] - shared_times[0]) // series_i.freq if add_to_start else 0 ) time_index_i = _extend_time_index( time_index_i, series_i.freq, new_start=new_start, new_end=new_end ) else: num_prepended = 0 shared_time_idx = ( np.searchsorted(time_index_i, shared_times).reshape(-1, 1) - num_prepended ) if series_and_lags_specified: idx_to_get = shared_time_idx + np.array(lags_i, dtype=int) # Before reshaping: lagged_vals.shape = (n_observations, num_lags, n_components, n_samples) lagged_vals = series_i.all_values(copy=False)[idx_to_get, :, :] # After reshaping: lagged_vals.shape = (n_observations, num_lags*n_components, n_samples) lagged_vals = lagged_vals.reshape( lagged_vals.shape[0], -1, lagged_vals.shape[-1] ) X.append(lagged_vals) # `target_series` indices required for creating labels: if is_target_series: label_shared_time_idx = shared_time_idx X = np.concatenate(X, axis=1) if is_training: if multi_models: # All points between time `t` and `t + output_chunk_length - 1` are labels: idx_to_get = ( label_shared_time_idx + np.arange(output_chunk_length) + output_chunk_shift ) else: # Only point at time `t + output_chunk_length - 1` is a label: idx_to_get = ( label_shared_time_idx + output_chunk_length + output_chunk_shift - 1 ) # Before reshaping: lagged_vals.shape = (n_observations, num_lags, n_components, n_samples) lagged_vals = target_series.all_values(copy=False)[idx_to_get, :, :] # After reshaping: lagged_vals.shape = (n_observations, num_lags*n_components, n_samples) y = lagged_vals.reshape(lagged_vals.shape[0], -1, lagged_vals.shape[-1]) else: y = None return X, y, shared_times def _create_lagged_data_autoregression( target_series: Union[TimeSeries, Sequence[TimeSeries]], t_pred: int, shift: int, last_step_shift: int, series_matrix: np.ndarray, covariate_matrices: Dict[str, np.ndarray], lags: Dict[str, List[int]], component_lags: Dict[str, Dict[str, List[int]]], relative_cov_lags: Dict[str, np.ndarray], uses_static_covariates: bool, last_static_covariates_shape: Optional[Tuple[int, int]], num_samples: int, ) -> np.ndarray: """Extract lagged data from target, past covariates and future covariates for auto-regression with RegressionModels. """ series_length = len(target_series) X = [] for series_type in ["target", "past", "future"]: if series_type not in lags: continue # extract series specific data values_matrix = ( series_matrix if series_type == "target" else covariate_matrices[series_type] ) if series_type not in component_lags: # for global lags over all components, directly extract lagged values from the data if series_type == "target": relative_lags = [ lag - (shift + last_step_shift) for lag in lags[series_type] ] else: relative_lags = relative_cov_lags[series_type] + t_pred lagged_data = values_matrix[:, relative_lags].reshape( series_length * num_samples, -1 ) else: # for component-specific lags, sort by lags and components and then extract tmp_X = _extract_component_lags_autoregression( series_type=series_type, values_matrix=values_matrix, shift=shift, last_step_shift=last_step_shift, t_pred=t_pred, lags=lags, component_lags=component_lags, ) lagged_data = tmp_X.reshape(series_length * num_samples, -1) X.append(lagged_data) # concatenate retrieved lags X = np.concatenate(X, axis=1) if not uses_static_covariates: return X # Need to split up `X` into three equally-sized sub-blocks # corresponding to each timeseries in `series`, so that # static covariates can be added to each block; valid since # each block contains same number of observations: X = np.split(X, series_length, axis=0) X, _ = add_static_covariates_to_lagged_data( features=X, target_series=target_series, uses_static_covariates=uses_static_covariates, last_shape=last_static_covariates_shape, ) # concatenate retrieved lags return np.concatenate(X, axis=0) def _extract_component_lags_autoregression( series_type: str, values_matrix: np.ndarray, shift: int, last_step_shift: int, t_pred: int, lags: Dict[str, List[int]], component_lags: Dict[str, Dict[str, List[int]]], ) -> np.ndarray: """Extract, concatenate and reorder component-wise lags to obtain a feature order identical to tabularization. """ # prepare index to reorder features by lags across components comp_lags_reordered = np.concatenate( [comp_lags for comp_lags in component_lags[series_type].values()] ).argsort() # convert relative lags to absolute if series_type == "target": lags_shift = -shift - last_step_shift else: lags_shift = -lags[series_type][0] + t_pred # extract features tmp_X = [ values_matrix[ :, [lag + lags_shift for lag in comp_lags], comp_i, ] for comp_i, comp_lags in enumerate(component_lags[series_type].values()) ] # concatenate on features dimension and reorder return np.concatenate(tmp_X, axis=1)[:, comp_lags_reordered] # For convenience, define following types for `_get_feature_times`: FeatureTimes = Tuple[ Optional[Union[pd.Index, pd.DatetimeIndex, pd.RangeIndex]], Optional[Union[pd.Index, pd.DatetimeIndex, pd.RangeIndex]], Optional[Union[pd.Index, pd.DatetimeIndex, pd.RangeIndex]], ] MinLags = Tuple[Optional[int], Optional[int], Optional[int]] MaxLags = Tuple[Optional[int], Optional[int], Optional[int]] def _get_feature_times( target_series: Optional[TimeSeries] = None, past_covariates: Optional[TimeSeries] = None, future_covariates: Optional[TimeSeries] = None, lags: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]] = None, output_chunk_length: int = 1, output_chunk_shift: int = 0, is_training: bool = True, return_min_and_max_lags: bool = False, check_inputs: bool = True, ) -> Union[FeatureTimes, Tuple[FeatureTimes, MinLags, MaxLags]]: """ Returns a tuple containing the times in `target_series`, the times in `past_covariates`, and the times in `future_covariates` that *could* be used to create features. The returned tuple of times can then be passed to `get_shared_times` to compute the 'eligible time points' shared by all specified series. Notes ----- For the purposes of extracting feature times from each series, we define the `min_lag` and `max_lag` of each series to be: `min_lag = -max(lags_*)`, `max_lag = -min(lags_*)` where `lags_*` denotes either `lags`, `lags_past_covariates`, or `lags_future_covariates`. For both `lags` and `lags_past_covariates`, `min_lag` and `max_lag` are guaranteed to be positive values, since the values in `lags` and `lags_past_covariates` must all be negative. For these two series then, `min_lag` and `max_lag` represent the smallest and largest magnitude lags requested by the user. For example: `lags = [-3, -2, -1] -> min_lag = 1, max_lag = 3` The values contained in `lags_future_covariates`, on the other hand, can be negative, zero, or positive; this means that there are three cases to consider: 1. Both `min_lag` and `max_lag` are positive, which means that all the values in `lags_future_covariates` are negative. In this case, `min_lag` and `max_lag` correspond to the smallest and largest lag magnitudes respectively. For example: `lags_future_covariates = [-3, -2, -1] -> min_lag = 1, max_lag = 3` 2. `min_lag` is non-positive (i.e. zero or negative), but `max_lag` is positive, which means that `lags_future_covariates` contains both negative and non-negative (i.e. zero or positive) lag values. In this case, `abs(min_lag)` corresponds to the magnitude of the largest *non-negative* lag value in `lags_future_covariates`, whilst `max_lag` corresponds to the largest *negative* lag value in `lags_future_covariates`. For example: `lags_future_covariates = [-2, -1, 0, 1, 3] -> min_lag = -3, max_lag = 2` 3. Both `min_lag` and `max_lag` are non-positive, which means that `lags_future_covariates` contains only non-negative lag values. In this case, `abs(min_lag)` and `abs(max_lag)`, rather confusingly, correspond to the largest and smallest lag magnitudes respectively. For example: `lags_future_covariates = [1, 2, 3] -> min_lag = -3, max_lag = -1` In all three cases, we have `min_lag <= max_lag`. As a direct consequence: 1. `min_lag > 0` is a sufficient condition for `min_lag` and `max_lag` both being positive (i.e. Case 1). 2. `max_lag <= 0` is a sufficient condition for `min_lag` and `max_lag` both being non-positive (i.e. Case 2). To extract feature times from a `target_series` when `is_training = True`, the following steps are performed: 1. The first `max_lag` times of the series are excluded; these times have too few preceding values to construct features from. 2. The last `output_chunk_length - output_chunk_shift - 1` times are excluded; these times have too few succeeding times to construct labels from. To extract feature times from a `target_series` when `is_training = False`, the following steps are performed: 1. An additional `min_lag` times are appended to the end of the series; although these times are not contained in the original series, we're able to construct features for them since we only need the values of the series from time `t - max_lag` to `t - min_lag` to construct a feature for time `t`. 2. The first `max_lag` times of the series are then excluded; these times have too few preceding values to construct features from. The exact same procedure is performed to extract the feature times from a `past_covariates` series. To extract feature times from `future_covariates`, we perform the following steps: 1. Depending on the signs of `min_lag` and `max_lag`, additional times are either prepended or appended to the original series. More specifically: a) If `min_lag` and `max_lag` are both positive (i.e. `min_lag > 0`), then an additional `min_lag` times are appended to the end of the series; as previously mentioned, we only need values up to time `t - min_lag` to construct a feature for time `t`. b) If `min_lag` and `max_lag` are both non-positive (i.e. `max_lag < 0`), then an additional `abs(max_lag)` times are prepended to the start of the series; this is because we only need to know the values of the series *after* time `t + abs(max_lag)` to construct a feature for time `t` when we're only extracting positive lags from `future_covariates`. c) If `min_lag` is non-positive and `max_lag` is positive, then *no additional times* are added to the series, since constructing a feature for time `t` requires knowing values from time `t - max_lag` to time `t + abs(min_lag)`; in other words, we need to have access to time `t` itself. 2. If `min_lag < 0`, the last `abs(min_lag)` times are excluded, since these values have fewer than `abs(min_lag)` values after them, which means we're unable to construct features for these times. 3. If `max_lag > 0`, the first `max_lag` times are excluded, since these values have fewer than `max_lag` values before them, which means we're unable to construct features for these times. Some additional behaviours to note about the `_get_feature_times` function are: 1. If `return_min_and_max_lags = True`, the smallest and largest lag value for each series is also returned as a pair of tuples. 2. For those series which are either unspecified, a `None` value takes the place of that series' feature time, minimum lag values, and maximum lag value. 3. If `is_training = True`, then `target_series` and `output_chunk_length` must be provided. Parameters ---------- target_series Optionally, the series for the regression model to predict. past_covariates Optionally, the past covariates series that the regression model will use as inputs. Unlike the `target_series`, `past_covariates` are *not* to be predicted by the regression model. future_covariates Optionally, the future covariates (i.e. exogenous covariates) series that the regression model will use as inputs. lags Optionally, the lags of the target series to be used as (autoregressive) features. If not specified, autoregressive features will *not* be added to `X`. lags_past_covariates Optionally, the lags of `past_covariates` to be used as features. lags_future_covariates Optionally, the lags of `future_covariates` to be used as features. output_chunk_length Optionally, the number of time steps ahead into the future the regression model is to predict. This is ignored if `is_training = False`. output_chunk_shift Optionally, the number of time steps to shift the output chunk ahead into the future. is_training Optionally, specifies that training data is to be generated from the specified series. If `True`, `target_series`, `output_chunk_length`, and `multi_models` must all be specified. check_inputs Optionally, specifies that the `lags_*` and `series_*` inputs should be checked for validity. Should be set to `False` if inputs have already been checked for validity (e.g. inside the `__init__` of a class), otherwise should be set to `True`. return_min_and_max_lags Optionally, specifies whether the largest magnitude lag value for each series should also be returned along with the 'eligible' feature times Note: if the lags are provided as a dictionary for the target series or any of the covariates series, the component-specific lags are grouped into a single list to compute the corresponding feature time. Returns ------- feature_times A tuple containing all the 'eligible feature times' in `target_series`, in `past_covariates`, and in `future_covariates`, in that order. If a particular series-lag pair isn't fully specified, then a `None` will take the place of that series' eligible times. min_lags Optionally, a tuple containing the smallest lag value in `lags`, `lags_past_covariates`, and `lags_future_covariates`, in that order. If a particular series-lag pair isn't fully specified, then a `None` will take the place of that series' minimum lag values. max_lags Optionally, a tuple containing the largest lag value in `lags`, `lags_past_covariates`, and `lags_future_covariates`, in that order. If a particular series-lag pair isn't fully specified, then a `None` will take the place of that series' maximum lag values. Raises ------ ValueError If `target_series` and `output_chunk_length` are not both specified if `is_training = True`. ValueError If any of the `lags` inputs contain non-negative values or if none of the `lags` inputs have been specified. ValueError If any of the series are too short for the requested `lags` and/or `output_chunk_length` values. UserWarning If a `lags_*` input is specified without the accompanying time series or vice versa. The only expection to this is when `lags` isn't specified alongside `target_series` when `is_training = True`, since one may wish to fit a regression model without using autoregressive features. """ raise_if( is_training and (target_series is None), "Must specify `target_series` when `is_training = True`.", ) if check_inputs: raise_if( not isinstance(output_chunk_length, int) or output_chunk_length < 1, "`output_chunk_length` must be a positive `int`.", ) _check_lags(lags, lags_past_covariates, lags_future_covariates) feature_times, min_lags, max_lags = [], [], [] for name_i, series_i, lags_i in zip( ["target_series", "past_covariates", "future_covariates"], [target_series, past_covariates, future_covariates], [lags, lags_past_covariates, lags_future_covariates], ): # union of the component-specific lags, unsorted if isinstance(lags_i, dict): lags_i = list(set(chain(*lags_i.values()))) if check_inputs and (series_i is not None): _check_series_length( series=series_i, lags=lags_i, output_chunk_length=output_chunk_length, output_chunk_shift=output_chunk_shift, is_training=is_training, name=name_i, ) series_specified = series_i is not None lags_specified = lags_i is not None is_label_series = is_training and name_i == "target_series" times_i = series_i.time_index if series_specified else None max_lag_i = -min(lags_i) if lags_specified else None min_lag_i = -max(lags_i) if lags_specified else None if is_label_series: # Exclude last `output_chunk_length - 1` times: if not output_chunk_shift: end_idx = -output_chunk_length + 1 if output_chunk_length > 1 else None else: end_idx = -output_chunk_length - output_chunk_shift + 1 times_i = times_i[:end_idx] elif series_specified and lags_specified: # Prepend times to start of series - see Step 1a for extracting # feature times from `future_covariates` in `Notes`: new_start = ( times_i[0] + series_i.freq * max_lag_i if max_lag_i < 0 else None ) # Append times to end of series - see Step 1b for extracting features # times from `future_covariates`, or Step 1 for extracting features # from `target_series`/`past_covariates` in `Notes`: new_end = ( times_i[-1] + series_i.freq * (min_lag_i) if min_lag_i > 0 else None ) times_i = _extend_time_index( times_i, series_i.freq, new_start=new_start, new_end=new_end ) if series_specified and lags_specified: # Exclude last `abs(min_lag)` times - see Step 2 for extracting feature # times from `future_covariates` in `Notes`: if min_lag_i < 0: times_i = times_i[:min_lag_i] # Exclude first `max_lag` times - see Step 3 for extracting feature times # from `future_covariates`, or Step 2 in extracting feature times from # `target_series`/`past_covariates` in `Notes`: if max_lag_i > 0: times_i = times_i[max_lag_i:] elif (not is_label_series) and (series_specified ^ lags_specified): # Warn user that series/lags input will be ignored: times_i = max_lag_i = None lags_name = "lags" if name_i == "target_series" else f"lags_{name_i}" specified = lags_name if lags_specified else name_i unspecified = name_i if lags_specified else lags_name warnings.warn( f"`{specified}` was specified without accompanying `{unspecified}` and, thus, will be ignored." ) feature_times.append(times_i) # Note `max_lag_i` and `min_lag_i` if requested: if series_specified and lags_specified: min_lags.append(min_lag_i) max_lags.append(max_lag_i) else: min_lags.append(None) max_lags.append(None) return ( (feature_times, min_lags, max_lags) if return_min_and_max_lags else feature_times )
[docs]def get_shared_times( *series_or_times: Union[TimeSeries, pd.Index, None], sort: bool = True ) -> pd.Index: """ Returns the times shared by all specified `TimeSeries` or time indexes (i.e. the intersection of all these times). If `sort = True`, then these shared times are sorted from earliest to latest. Any `TimeSeries` or time indices in `series_or_times` that aren't specified (i.e. are `None`) are simply ignored. Parameters ---------- series_or_times The `TimeSeries` and/or time indices that should 'intersected'. sort Optionally, specifies that the returned shared times should be sorted from earliest to latest. Returns ------- shared_times The time indices present in all specified `TimeSeries` and/or time indices. Raises ------ TypeError If the specified `TimeSeries` and/or time indices do not all share the same type of time index (i.e. must either be all `pd.DatetimeIndex` or all `pd.RangeIndex`). """ # `sort = None` specifies to `pd.Index.intersection` that values should be sorted: sort = None if sort else False def intersection_func(series_or_times_1, series_or_times_2): times_1 = ( series_or_times_1.time_index if isinstance(series_or_times_1, TimeSeries) else series_or_times_1 ) times_2 = ( series_or_times_2.time_index if isinstance(series_or_times_2, TimeSeries) else series_or_times_2 ) return times_1.intersection(times_2, sort=sort) specified_inputs = [series for series in series_or_times if series is not None] if not specified_inputs: shared_times = None elif len(specified_inputs) == 1: shared_times = ( specified_inputs[0].time_index if isinstance(specified_inputs[0], TimeSeries) else specified_inputs[0] ) shared_times = None if len(shared_times) == 0 else shared_times else: shared_times = reduce(intersection_func, specified_inputs) # Empty intersection may result from intersecting time indices being of different types - throw error if so: if shared_times.empty: shared_times = None times_types = [ type(ts.time_index if isinstance(ts, TimeSeries) else ts) for ts in specified_inputs ] raise_if_not( len(set(times_types)) == 1, ( "Specified series and/or times must all " "have the same type of `time_index` (i.e. all " "`pd.RangeIndex` or all `pd.DatetimeIndex`)." ), ) return shared_times
[docs]def get_shared_times_bounds( *series_or_times: Sequence[Union[TimeSeries, pd.Index, None]] ) -> Union[Tuple[pd.Index, pd.Index], None]: """ Returns the latest `start_time` and the earliest `end_time` among all non-`None` `series_or_times`; these are (non-tight) lower and upper `bounds` on the intersection of all these `series_or_times` respectively. If no potential overlap exists between all specified series, `None` is returned instead. Notes ----- If all specified `series_or_times` are of the same frequency, then `get_shared_times_bounds` returns tight `bounds` (i.e. the earliest and latest time within the intersection of all the timeseries is returned). To see this, suppose we have three equal-frequency series with observations made at different times: Series 1: ------ Series 2: ------ Series 3: ------ Here, each `-` denotes an observation at a specific time. In this example, `find_time_overlap_bounds` will return the times at `LB` and `UB`: LB Series 1: ---|---| Series 2: |---|--- Series 3: --|---|- UB If the specified timeseries are *not* of the same frequency, then the returned `bounds` is potentially non-tight (i.e. `LB <= intersection.start_time() < intersection.end_time() <= UB`, where `intersection` are the times shared by all specified timeseries) Parameters ---------- series_or_times The `TimeSeries` and/or `pd.Index` values to compute intersection `bounds` for; any provided `None` values are ignored. Returns ------- bounds Tuple containing the latest `start_time` and earliest `end time` among all specified `timeseries`, in that order. If no potential overlap exists between the specified series, then `None` is returned instead. Similarly, if no non-`None` `series_or_times` were specified, `None` is returned. Raises ------ TypeError If the series and/or times in `series_or_times` don't all share the same type of `time_index` (i.e. either all `pd.DatetimeIndex` or `pd.RangeIndex`). """ start_times, end_times = [], [] for val in series_or_times: if (val is not None) and (len(val) > 0): start_times.append( val.start_time() if isinstance(val, TimeSeries) else val[0] ) end_times.append(val.end_time() if isinstance(val, TimeSeries) else val[-1]) if not start_times: bounds = None else: times_types = [type(time) for time in start_times] raise_if_not( len(set(times_types)) == 1, ( "Specified series and/or times must all " "have the same type of `time_index` " "(i.e. all `pd.RangeIndex` or all `pd.DatetimeIndex`)." ), ) # If `start_times` empty, no series were specified -> `bounds = (1, -1)` will # be 'converted' to `None` in next line: bounds = (max(start_times), min(end_times)) if start_times else (1, -1) # Specified timeseries share no overlapping periods. if bounds[1] < bounds[0]: bounds = None return bounds
[docs]def strided_moving_window( x: np.ndarray, window_len: int, stride: int = 1, axis: int = 0, check_inputs: bool = True, ) -> np.ndarray: """ Extracts moving window views of an `x` array along a specified `axis`, where each window is of length `window_len` and consecutive windows are separated by `stride` indices. The total number of extracted windows equals `num_windows = (x.shape[axis] - window_len)//stride + 1`. Notes ----- This function is similar to `sliding_window_view` in `np.lib.stride_tricks`, except that: 1. `strided_moving_window` allows for consecutive windows to be separated by a specified `stride`, whilst `sliding_window_view` does not. 2. `strided_moving_window` can only operate along a single axis, whereas `sliding_window_view` can operate along multiple axes. Additionally, unlike `sliding_window_view`, using `strided_moving_window` doesn't require `numpy >= 1.20.0`. Parameters ---------- x The array from which to extract moving windows. window_len The size of the extracted moving windows. stride Optionally, the separation between consecutive windows. axis Optionally, the axis along which the moving windows should be extracted. check_inputs Optionally, specifies whether inputs should be checked for validity. Should be set to `False` if inputs have already been checked for validity (e.g. inside the `__init__` of a class), otherwise should be set to `True`. See [1]_ for further details. Returns ------- windows The moving windows extracted from `x`. The extracted windows are stacked along the last axis, and the `axis` along which the windows were extracted is 'trimmed' such that its length equals the number of extracted windows. More specifically, `windows.shape = x_trimmed_shape + (window_len,)`, where `x_trimmed_shape` equals `x.shape`, except that `x_trimmed_shape[axis] = num_windows`. Raises ------ ValueError If `check_inputs = True` and `window_len` is not positive. ValueError If `check_inputs = True` and `stride` is not positive. ValueError If `check_inputs = True` and `axis` is greater than `x.ndim`. ValueError If `check_inputs = True` and `window_len` is larger than `x.shape[axis]`. References ---------- .. [1] https://numpy.org/doc/stable/reference/generated/numpy.lib.stride_tricks.as_strided.html """ if check_inputs: raise_if( not isinstance(stride, int) or stride < 1, "`stride` must be a positive `int`.", ) raise_if( not isinstance(window_len, int) or window_len < 1, "`window_len` must be a positive `int`.", ) raise_if( not isinstance(axis, int) or axis > x.ndim - 1 or axis < -x.ndim, "`axis` must be an `int` that is less than `x.ndim`.", ) raise_if( window_len > x.shape[axis], "`window_len` must be less than or equal to x.shape[axis].", ) num_windows = (x.shape[axis] - window_len) // stride + 1 new_shape = list(x.shape) new_shape[axis] = num_windows new_shape = tuple(new_shape) + (window_len,) out_strides = list(x.strides) + [x.strides[axis]] out_strides[axis] = stride * out_strides[axis] out_strides = tuple(out_strides) return as_strided(x, shape=new_shape, strides=out_strides)
# # Private Functions # def _extend_time_index( time_index: pd.Index, freq: Union[int, str], new_start: Optional[pd.Timestamp] = None, new_end: Optional[pd.Timestamp] = None, ): """ Extends a `time_index` of frequency `freq` such that it now ends at time `new_end`; the fastest way to do this is actually to create a new time index from scratch. """ is_range_idx = isinstance(freq, int) if new_start is None: new_start = time_index[0] if new_end is None: new_end = time_index[-1] if is_range_idx: time_index = pd.RangeIndex(start=new_start, stop=new_end + freq, step=freq) else: time_index = pd.date_range(start=new_start, end=new_end, freq=freq) return time_index def _get_freqs(*series: Union[TimeSeries, None]): """ Returns list with the frequency of all specified (i.e. non-`None`) `series`. """ freqs = [] for ts in series: if ts is not None: freqs.append(ts.freq) return freqs def _all_equal_freq(*series: Union[TimeSeries, None]) -> bool: """ Returns `True` if all specified (i.e. non-`None`) `series` have the same frequency. """ freqs = _get_freqs(*series) return len(set(freqs)) == 1 def _check_lags( lags: Optional[Union[Sequence[int], Dict[str, List[int]]]], lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]], lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]], ) -> None: """ Throws `ValueError` if any `lag` values aren't negative OR if no lags have been specified. """ all_lags = [lags, lags_past_covariates, lags_future_covariates] suffixes = ["", "_past_covariates", "_future_covariates"] lags_is_none = [] for i, (suffix, lags_i) in enumerate(zip(suffixes, all_lags)): lags_is_none.append(lags_i is None) if not lags_is_none[-1]: is_target_or_past = i < 2 max_lag = -1 if is_target_or_past else inf if isinstance(lags_i, dict): lags_i = list(set(chain(*lags_i.values()))) raise_if( any((lag > max_lag or not isinstance(lag, int)) for lag in lags_i), f"`lags{suffix}` must be a `Sequence` or `Dict` containing only `int` values less than {max_lag + 1}.", ) raise_if( all(lags_is_none), "Must specify at least one of: `lags`, `lags_past_covariates`, `lags_future_covariates`.", ) return None def _check_series_length( series: TimeSeries, lags: Union[None, Sequence[int]], output_chunk_length: int, output_chunk_shift: int, is_training: bool, name: Literal["target_series", "past_covariates", "future_covariates"], ) -> None: """ Throws `ValueError` if `series` is too short for specified `lags` and, when `is_training`, `output_chunk_length`. """ is_target = name == "target_series" is_label_series = is_training and is_target lags_specified = lags is not None minimum_len, minimum_len_str = None, None if is_label_series: minimum_len_str = ( "-min(lags) + output_chunk_length" if lags_specified else "output_chunk_length" ) + " + output_chunk_shift" minimum_len = ( output_chunk_length + output_chunk_shift + (-min(lags) if lags_specified else 0) ) elif lags_specified: lags_name = "lags" if name == "target_series" else f"lags_{name}" minimum_len_str = f"-min({lags_name}) + max({lags_name}) + 1" minimum_len = -min(lags) + max(lags) + 1 if lags_specified: raise_if( series.n_timesteps < minimum_len, ( f"`{name}` must have at least " f"`{minimum_len_str}` = {minimum_len} time steps; " f"instead, it only has {series.n_timesteps}." ), ) return None