"""
Encoder Base Classes
--------------------
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from darts import TimeSeries
from darts.dataprocessing.transformers import FittableDataTransformer
from darts.logging import get_logger, raise_if, raise_log
from darts.utils.utils import generate_index
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
SupportedIndex = Union[pd.DatetimeIndex, pd.RangeIndex]
EncoderOutputType = Optional[Union[Sequence[TimeSeries], List[TimeSeries]]]
logger = get_logger(__name__)
class _EncoderMethod:
"""Connects the encoder stage to the corresponding methods"""
def __init__(self, stage: Literal["train", "inference", "train_inference"]):
self.method = None
if stage == "train":
self.method = "encode_train"
elif stage == "inference":
self.method = "encode_inference"
elif stage == "train_inference":
self.method = "encode_train_inference"
else:
raise_log(
ValueError(
f"Unknown encoder `stage={stage}`. Must be on of `('train', 'inference', 'train_inference')`"
),
logger,
)
[docs]class CovariatesIndexGenerator(ABC):
def __init__(
self,
input_chunk_length: Optional[int] = None,
output_chunk_length: Optional[int] = None,
lags_covariates: Optional[List[int]] = None,
):
""":class:`CovariatesIndexGenerator` generates a time index for covariates at training and inference /
prediction time with methods :func:`generate_train_idx()`, and :func:`generate_inference_idx()`.
Without user `covariates`, it generates the minimum required covariate times spans for the corresponding
scenarios described below. With user `covariates`, it simply copies and returns the `covariates` time index.
It can be used:
A in combination with :class:`LocalForecastingModel`, or in a model agnostic scenario:
All parameters can be ignored. This scenario is only supported by
:class:`FutureCovariatesIndexGenerator`.
B in combination with :class:`RegressionModel`:
Set `input_chunk_length`, `output_chunk_length`, and `lags_covariates`.
`input_chunk_length` is the absolute value of the minimum target lag `abs(min(lags))` used with the
regression model.
Set `output_chunk_length`, and `lags_covariates` with the identical values used at forecasting model
creation. For the covariates lags, use `lags_past_covariates` for class:`PastCovariatesIndexGenerator`,
and `lags_future_covariates` for class:`PastCovariatesIndexGenerator`.
C in combination with :class:`TorchForecastingModel`:
Set `input_chunk_length`, and `output_chunk_length` with the identical values used at forecasting model
creation.
Parameters
----------
input_chunk_length
Optionally, the number of input target time steps per chunk. Only required in scenarios B, C.
Corresponds to `input_chunk_length` for :class:`TorchForecastingModel`, or to the absolute minimum target
lag value `abs(min(lags))` for :class:`RegressionModel`.
output_chunk_length
Optionally, the number of output target time steps per chunk. Only required in scenarios B, and C.
Corresponds to `output_chunk_length` for both :class:`TorchForecastingModel`, and :class:`RegressionModel`.
lags_covariates
Optionally, a list of integers giving the covariates lags used for Darts' RegressionModels. Only required
in scenario B. Corresponds to the lag values from `lags_past_covariates` for past covariates, and
`lags_future_covariates` for future covariates.
"""
# check that parameters match one of the scenarios
self._verify_scenario(input_chunk_length, output_chunk_length, lags_covariates)
# input/output chunk length are guaranteed to both be `None`, or both be defined
self.input_chunk_length = input_chunk_length
self.output_chunk_length = output_chunk_length
# check lags validity
min_covariates_lag = (
min(lags_covariates) if lags_covariates is not None else None
)
max_covariates_lag = (
max(lags_covariates) if lags_covariates is not None else None
)
self._verify_lags(min_covariates_lag, max_covariates_lag)
# from verification min/max lags are guaranteed to either both be None, or both be an integer
if min_covariates_lag is not None:
# we add 1 to the lags so that shift == 0 represents the end of the target series (forecasting point)
shift_start = min_covariates_lag + 1
shift_end = max_covariates_lag + 1
else:
shift_start = None
shift_end = None
self.shift_start = shift_start
self.shift_end = shift_end
[docs] @abstractmethod
def generate_train_idx(
self, target: TimeSeries, covariates: Optional[TimeSeries] = None
) -> Tuple[SupportedIndex, pd.Timestamp]:
"""
Generates/extracts time index (or integer index) for covariates at model training time.
Parameters
----------
target
The target TimeSeries used during training.
covariates
Optionally, the covariates used for training.
If given, the returned time index is equal to the `covariates` time index. Else, the returned time index
covers the minimum required covariate time span for training a specific forecasting model. These
requirements are derived from parameters set at :class:`CovariatesIndexGenerator` creation.
"""
pass
[docs] @abstractmethod
def generate_inference_idx(
self, n: int, target: TimeSeries, covariates: Optional[TimeSeries] = None
) -> Tuple[SupportedIndex, pd.Timestamp]:
"""
Generates/extracts time index (or integer index) for covariates at model inference / prediction time.
Parameters
----------
n
The forecasting horizon.
target
The target TimeSeries used during training or passed to prediction as `series`.
covariates
Optionally, the covariates used for prediction.
If given, the returned time index is equal to the `covariates` time index. Else, the returned time index
covers the minimum required covariate time spans for performing inference / prediction with a specific
forecasting model. These requirements are derived from parameters set at :class:`CovariatesIndexGenerator`
creation.
"""
pass
[docs] def generate_train_inference_idx(
self, n: int, target: TimeSeries, covariates: Optional[TimeSeries] = None
) -> Tuple[SupportedIndex, pd.Timestamp]:
"""
Generates/extracts time index (or integer index) for covariates for training and inference / prediction.
Parameters
----------
n
The forecasting horizon.
target
The target TimeSeries used for training and inference / prediction as `series`.
covariates
Optionally, the covariates used for training and inference / prediction.
If given, the returned time index is equal to the `covariates` time index. Else, the returned time index
covers the minimum required covariate time spans for performing training and inference / prediction with a
specific forecasting model. These requirements are derived from parameters set at
:class:`CovariatesIndexGenerator` creation.
"""
train_idx, target_end = self.generate_train_idx(
target=target, covariates=covariates
)
inference_idx, _ = self.generate_inference_idx(
n=n, target=target, covariates=covariates
)
# generate index end is inclusive, should not be a problem when taking union
gap = generate_index(
start=train_idx[-1], end=inference_idx[0] - target.freq, freq=target.freq
)
return (
train_idx.__class__.union(train_idx, gap).union(inference_idx),
target_end,
)
@property
@abstractmethod
def base_component_name(self) -> str:
"""Returns the index generator base component name.
- "pc": past covariates
- "fc": future covariates
"""
pass
def _verify_scenario(
self,
input_chunk_length: Optional[int] = None,
output_chunk_length: Optional[int] = None,
lags_covariates: Optional[List[int]] = None,
):
# LocalForecastingModel, or model agnostic (only supported by future covariates)
is_scenario_a = (
isinstance(self, FutureCovariatesIndexGenerator)
and input_chunk_length is None
and output_chunk_length is None
and lags_covariates is None
)
# RegressionModel
is_scenario_b = (
input_chunk_length is not None
and output_chunk_length is not None
and lags_covariates is not None
)
# TorchForecastingModel
is_scenario_c = (
input_chunk_length is not None
and output_chunk_length is not None
and lags_covariates is None
)
if not any([is_scenario_a, is_scenario_b, is_scenario_c]):
raise_log(
ValueError(
"Invalid `CovariatesIndexGenerator` parameter combination: Could not be mapped to an existing "
"scenario, as defined in "
"https://unit8co.github.io/darts/generated_api/darts.dataprocessing.encoders.encoder_base.html"
"#darts.dataprocessing.encoders.encoder_base.CovariatesIndexGenerator"
),
logger=logger,
)
def _verify_lags(self, min_covariates_lag, max_covariates_lag):
"""Check the base requirements for `min_covariates_lag` and `max_covariates_lag`:
- both must either be None or an integer
- min_covariates_lag < max_covariates_lag
This method can be extended by subclasses for past and future covariates lag requirements.
"""
# check that either None one of min/max_covariates_lag are given, or both are given
if (min_covariates_lag is not None and max_covariates_lag is None) or (
min_covariates_lag is None and max_covariates_lag is not None
):
raise_log(
ValueError(
"`min_covariates_lag` and `max_covariates_lag` must either both be `None` or both be integers"
),
logger=logger,
)
if min_covariates_lag is not None:
# check that if one of the two is given, both must be integers
if not isinstance(min_covariates_lag, int) or not isinstance(
max_covariates_lag, int
):
raise_log(
ValueError(
"`min_covariates_lag` and `max_covariates_lag` must be both be integers."
),
logger=logger,
)
# minimum lag must be less than maximum lag
if min_covariates_lag > max_covariates_lag:
raise_log(
ValueError(
"`min_covariates_lag` must be smaller than/equal to `max_covariates_lag`."
),
logger=logger,
)
[docs]class PastCovariatesIndexGenerator(CovariatesIndexGenerator):
"""Generates index for past covariates on train and inference datasets"""
[docs] def generate_train_idx(
self, target: TimeSeries, covariates: Optional[TimeSeries] = None
) -> Tuple[SupportedIndex, pd.Timestamp]:
super().generate_train_idx(target, covariates)
# the returned index depends on the following cases:
# case 0
# user supplied covariates: simply return the covariate time index; guarantees that an exception is
# raised if user supplied insufficient covariates
# case 1
# only input_chunk_length and output_chunk_length are given: the complete covariate index is within the
# target index; always True for all models except RegressionModels.
# case 2
# covariate lags were given (shift_start <= 0 and shift_end <= 0) and
# abs(shift_start - 1) <= input_chunk_length: the complete covariate index is within the target index;
# can only be True for RegressionModels.
# case 3
# covariate lags were given (shift_start <= 0 and shift_end <= 0) and
# abs(shift_start - 1) > input_chunk_length: we need to add indices before the beginning of the target
# series; can only be True for RegressionModels.
target_end = target.end_time()
if covariates is not None: # case 0
return covariates.time_index, target_end
if self.shift_start is None: # case 1
steps_ahead_start = 0
else: # case 2 & 3
steps_ahead_start = self.input_chunk_length + (self.shift_start - 1)
if not self.shift_end: # case 1
steps_ahead_end = -self.output_chunk_length
else: # case 2 & 3
steps_ahead_end = -(self.output_chunk_length - self.shift_end)
steps_ahead_end = steps_ahead_end if steps_ahead_end else None
return (
_generate_train_idx(target, steps_ahead_start, steps_ahead_end),
target_end,
)
[docs] def generate_inference_idx(
self, n: int, target: TimeSeries, covariates: Optional[TimeSeries] = None
) -> Tuple[SupportedIndex, pd.Timestamp]:
super().generate_inference_idx(n, target, covariates)
# for prediction (`n` is given) with past covariates the returned index depends on the following cases:
# case 0
# user supplied covariates: simply return the covariate time index; guarantees that an exception is
# raised if user supplied insufficient covariates.
# case 1
# only input_chunk_length and output_chunk_length are given: we need to generate a time index that starts
# `input_chunk_length - 1` before the end of `target` and ends `max(0, n - output_chunk_length)` after the
# end of `target`; always True for all models except RegressionModels.
# case 2
# covariate lags were given (shift_start <= 0 and shift_end <= 0): we need to generate a time index that
# starts `-shift_start` before the end of `target` and has a length of
# `shift_steps + max(0, n - output_chunk_length)`, where `shift_steps` is the number of time steps between
# `shift_start` and `shift_end`; can only be True for RegressionModels.
target_end = target.end_time()
if covariates is not None: # case 0
return covariates.time_index, target_end
if self.shift_start is None or self.shift_end is None: # case 1
steps_back_end = self.input_chunk_length - 1
n_steps = steps_back_end + 1 + max(0, n - self.output_chunk_length)
else: # case 2
steps_back_end = -self.shift_start
shift_steps = self.shift_end - self.shift_start + 1
n_steps = shift_steps + max(0, n - self.output_chunk_length)
return (
generate_index(
start=target.end_time() - target.freq * steps_back_end,
length=n_steps,
freq=target.freq,
),
target_end,
)
@property
def base_component_name(self) -> str:
return "pc"
def _verify_lags(self, min_covariates_lag, max_covariates_lag):
# general lag checks
super()._verify_lags(min_covariates_lag, max_covariates_lag)
# check past covariate specific lag requirements
if min_covariates_lag is not None and min_covariates_lag >= 0:
raise_log(ValueError("`min_covariates_lag` must be < 0."), logger=logger)
if max_covariates_lag is not None and max_covariates_lag >= 0:
raise_log(ValueError("`max_covariates_lag` must be < 0."), logger=logger)
[docs]class FutureCovariatesIndexGenerator(CovariatesIndexGenerator):
"""Generates index for future covariates on train and inference datasets."""
[docs] def generate_train_idx(
self, target: TimeSeries, covariates: Optional[TimeSeries] = None
) -> Tuple[SupportedIndex, pd.Timestamp]:
super().generate_train_idx(target, covariates)
# the returned index depends on the following cases:
# case 0
# user supplied covariates: simply return the covariate time index; guarantees that models raise an
# exception if user supplied insufficient covariates
# case 1
# user uses a LocalForecastingModel or model agnostic scenario (input_chunk_length is None):
# simply return the target time index.
# case 2
# only input_chunk_length and output_chunk_length are given: the complete covariate index is within the
# target index; always True for all models except RegressionModels.
# case 3
# covariate lags were given and (shift_start <= 0 or shift_end <= 0): historic part of future covariates.
# if shift_end < 0 there will only be the historic part of future covariates.
# If shift_start <= 0 and abs(shift_start - 1) > input_chunk_length: we need to add indices before the
# beginning of the target series; can only be True for RegressionModels.
# case 4
# covariate lags were given and (shift_start > 0 or shift_end > 0): future part of future covariates.
# if shift_start > 0 there will only be the future part of future covariates.
# If shift_end > 0 and shift_start > input_chunk_length: we need to add indices after the end of the
# target series; can only be True for RegressionModels.
target_end = target.end_time()
if covariates is not None: # case 0
return covariates.time_index, target_end
if self.input_chunk_length is None: # case 1
return target.time_index, target_end
if self.shift_start is None: # case 2
steps_ahead_start = 0
else: # case 3
steps_ahead_start = self.input_chunk_length + self.shift_start - 1
if self.shift_end is None: # case 2
steps_ahead_end = 0
else: # case 4
steps_ahead_end = -self.output_chunk_length + self.shift_end
steps_ahead_end = steps_ahead_end if steps_ahead_end else None
return (
_generate_train_idx(target, steps_ahead_start, steps_ahead_end),
target_end,
)
[docs] def generate_inference_idx(
self, n: int, target: TimeSeries, covariates: Optional[TimeSeries] = None
) -> Tuple[SupportedIndex, pd.Timestamp]:
super().generate_inference_idx(n, target, covariates)
# for prediction (`n` is given) with future covariates the returned index depends on the following cases:
# case 0
# user supplied covariates: simply return the covariate time index; guarantees that an exception is
# raised if user supplied insufficient covariates
# case 1
# user uses a LocalForecastingModel or model agnostic scenario (input_chunk_length is None):
# simply return the target time index.
# case 2
# only input_chunk_length and output_chunk_length are given: we need to generate a time index that starts
# `input_chunk_length - 1` before the end of `target` and ends `max(n, output_chunk_length)` after the
# end of `target`; always True for all models except RegressionModels.
# case 3
# covariate lags were given: we need to generate a time index that starts `-shift_start`
# steps before the end of `target` and has a length of `shift_steps + max(0, n - output_chunk_length)`,
# where `shift_steps` is `shift_end - shift_start`; can only be True for RegressionModels.
target_end = target.end_time()
if covariates is not None: # case 0
return covariates.time_index, target_end
if self.input_chunk_length is None:
steps_back_end = -1
n_steps = n
elif self.shift_start is None: # case 2
steps_back_end = self.input_chunk_length - 1
n_steps = steps_back_end + 1 + max(n, self.output_chunk_length)
else: # case 3
steps_back_end = -self.shift_start
shift_steps = self.shift_end + steps_back_end + 1
n_steps = shift_steps + max(0, n - self.output_chunk_length)
return (
generate_index(
start=target.end_time() - target.freq * steps_back_end,
length=n_steps,
freq=target.freq,
),
target_end,
)
@property
def base_component_name(self) -> str:
return "fc"
[docs]class Encoder(ABC):
"""Abstract class for all encoders"""
@abstractmethod
def __init__(self):
self.attribute = None
self.dtype = np.float64
self._fit_called = False
[docs] @abstractmethod
def encode_train(
self,
target: TimeSeries,
covariates: Optional[TimeSeries] = None,
merge_covariates: bool = True,
**kwargs,
) -> TimeSeries:
"""Each subclass must implement a method to encode the covariates index for training.
Parameters
----------
target
The target TimeSeries used during training or passed to prediction as `series`.
covariates
Optionally, the past or future covariates used for training.
merge_covariates
Whether to merge the encoded TimeSeries with `covariates`.
"""
pass
[docs] @abstractmethod
def encode_inference(
self,
n: int,
target: TimeSeries,
covariates: Optional[TimeSeries] = None,
merge_covariates: bool = True,
**kwargs,
) -> TimeSeries:
"""Each subclass must implement a method to encode the covariates index for prediction.
Parameters
----------
n
The forecast horizon
target
The target TimeSeries used during training or passed to prediction as `series`
covariates
Optionally, the past or future covariates used for prediction.
merge_covariates
Whether to merge the encoded TimeSeries with `covariates`.
"""
pass
[docs] @abstractmethod
def encode_train_inference(
self,
n: int,
target: TimeSeries,
covariates: Optional[TimeSeries] = None,
merge_covariates: bool = True,
**kwargs,
) -> TimeSeries:
"""Each subclass must implement a method to encode the covariates index for training and prediction.
Parameters
----------
n
The forecast horizon
target
The target TimeSeries used during training and prediction.
covariates
Optionally, the past or future covariates used for training and prediction.
merge_covariates
Whether to merge the encoded TimeSeries with `covariates`.
"""
pass
@staticmethod
def _merge_covariates(
encoded: TimeSeries, covariates: Optional[TimeSeries] = None
) -> TimeSeries:
"""If (actual) covariates are given, merge the encoded index with the covariates
Parameters
----------
encoded
The encoded TimeSeries either from `encode_train()` or `encode_inference()`
covariates
Optionally, some past or future covariates supplied by the user.
"""
return covariates.stack(encoded) if covariates is not None else encoded
@staticmethod
def _drop_encoded_components(
covariates: Optional[TimeSeries], components: pd.Index
) -> Optional[TimeSeries]:
"""Avoid pitfalls: `encode_train()` or `encode_inference()` can be called multiple times or chained.
Exclude any encoded components from `covariates` to generate and add the new encodings at a later time.
"""
if covariates is None:
return covariates
duplicate_components = components[components.isin(covariates.components)]
# case 1: covariates only consist of encoded components
if len(duplicate_components) == len(covariates.components):
covariates = None
# case 2: covariates also have non-encoded components
elif len(duplicate_components) and len(duplicate_components) < len(
covariates.components
):
covariates = covariates[
list(
covariates.components[
~covariates.components.isin(duplicate_components)
]
)
]
return covariates
@property
def fit_called(self) -> bool:
"""Returns whether the `Encoder` object has been fitted."""
return self._fit_called
@property
@abstractmethod
def requires_fit(self) -> bool:
"""Whether the `Encoder` sub class must be fit with `Encoder.encode_train()` before inference
with `Encoder.encode_inference()`."""
pass
[docs]class SingleEncoder(Encoder, ABC):
"""`SingleEncoder`: Abstract class for single index encoders.
Single encoders can be used to implement new encoding techniques.
Each single encoder must implement an `_encode()` method that carries the encoding logic.
The `_encode()` method must take an `index` as input and generate a encoded single `TimeSeries` as output.
"""
def __init__(self, index_generator: CovariatesIndexGenerator):
"""Single encoders take an `index_generator` to generate the required index for encoding past and future
covariates.
See darts.utils.data.covariate_index_generators.py for the `CovariatesIndexGenerator` subclasses.
For past covariates encoders, use a `PastCovariatesIndexGenerator`.
For future covariates encoders use a `FutureCovariatesIndexGenerator`.
Parameters
----------
index_generator
An instance of `CovariatesIndexGenerator` with methods `generate_train_idx()` and
`generate_inference_idx()`. Used to generate the index for encoders.
"""
super().__init__()
self.index_generator = index_generator
self._components = pd.Index([])
@abstractmethod
def _encode(
self, index: SupportedIndex, target_end: pd.Timestamp, dtype: np.dtype
) -> TimeSeries:
"""Single Encoders must implement an _encode() method to encode the index.
Parameters
----------
index
The index generated from `self.index_generator` for either the train or inference dataset.
target_end
The end time of the target series.
dtype
The dtype of the encoded index
"""
pass
[docs] def encode_train(
self,
target: TimeSeries,
covariates: Optional[TimeSeries] = None,
merge_covariates: bool = True,
**kwargs,
) -> TimeSeries:
"""Returns encoded index for training.
Parameters
----------
target
The target TimeSeries used during training or passed to prediction as `series`
covariates
Optionally, the covariates used for training: past covariates if `self.index_generator` is a
`PastCovariatesIndexGenerator`, future covariates if `self.index_generator` is a
`FutureCovariatesIndexGenerator`
merge_covariates
Whether to merge the encoded TimeSeries with `covariates`.
"""
# exclude encoded components from covariates to add the newly encoded components later
covariates = self._drop_encoded_components(covariates, self.components)
# generate index and encodings
index, target_end = self.index_generator.generate_train_idx(target, covariates)
encoded = self._encode(index, target_end, target.dtype)
# optionally, merge encodings with original `covariates` series
encoded = (
self._merge_covariates(encoded, covariates=covariates)
if merge_covariates
else encoded
)
# save encoded component names
if self.components.empty:
components = encoded.components
if covariates is not None:
components = components[~components.isin(covariates.components)]
self._components = components
self._fit_called = True
return encoded
[docs] def encode_inference(
self,
n: int,
target: TimeSeries,
covariates: Optional[TimeSeries] = None,
merge_covariates: bool = True,
**kwargs,
) -> TimeSeries:
"""Returns encoded index for inference/prediction.
Parameters
----------
n
The forecast horizon
target
The target TimeSeries used during training or passed to prediction as `series`
covariates
Optionally, the covariates used for prediction: past covariates if `self.index_generator` is a
`PastCovariatesIndexGenerator`, future covariates if `self.index_generator` is a
`FutureCovariatesIndexGenerator`
merge_covariates
Whether to merge the encoded TimeSeries with `covariates`.
"""
# some encoders must be fit before `encode_inference()`
raise_if(
not self.fit_called and self.requires_fit,
f"`{self.__class__.__name__}` object must be trained before inference. "
f"Call method `encode_train()` before `encode_inference()`.",
logger=logger,
)
# exclude encoded components from covariates to add the newly encoded components later
covariates = self._drop_encoded_components(covariates, self.components)
# generate index and encodings
index, target_end = self.index_generator.generate_inference_idx(
n, target, covariates
)
encoded = self._encode(index, target_end, target.dtype)
# optionally, merge encodings with original `covariates` series
encoded = (
self._merge_covariates(encoded, covariates=covariates)
if merge_covariates
else encoded
)
# optionally, save encoded component names also at inference as some encoders do not have to be trained before
if self.components.empty:
components = encoded.components
if covariates is not None:
components = components[~components.isin(covariates.components)]
self._components = components
return encoded
[docs] def encode_train_inference(
self,
n: int,
target: TimeSeries,
covariates: Optional[TimeSeries] = None,
merge_covariates: bool = True,
**kwargs,
) -> TimeSeries:
"""Returns encoded index for inference/prediction.
Parameters
----------
n
The forecast horizon
target
The target TimeSeries used during training and prediction.
covariates
Optionally, the covariates used for training and prediction: past covariates if `self.index_generator` is a
`PastCovariatesIndexGenerator`, future covariates if `self.index_generator` is a
`FutureCovariatesIndexGenerator`
merge_covariates
Whether to merge the encoded TimeSeries with `covariates`.
"""
# exclude encoded components from covariates to add the newly encoded components later
covariates = self._drop_encoded_components(covariates, self.components)
# generate index and encodings
index, target_end = self.index_generator.generate_train_inference_idx(
n, target, covariates
)
encoded = self._encode(index, target_end, target.dtype)
# optionally, merge encodings with original `covariates` series
encoded = (
self._merge_covariates(encoded, covariates=covariates)
if merge_covariates
else encoded
)
# save encoded component names
if self.components.empty:
components = encoded.components
if covariates is not None:
components = components[~components.isin(covariates.components)]
self._components = components
self._fit_called = True
return encoded
@property
@abstractmethod
def accept_transformer(self) -> List[bool]:
"""Whether the `SingleEncoder` sub class accepts to be transformed."""
pass
@property
@abstractmethod
def encoding_n_components(self) -> int:
"""The number of components in the `SingleEncoder` output."""
pass
@property
def components(self) -> pd.Index:
"""Returns the encoded component names. Only available after `Encoder.encode_train()` or
`Encoder.encode_inference()` have been called."""
return self._components
@property
@abstractmethod
def base_component_name(self) -> str:
"""Returns the base encoder base component name. The string follows the given format:
`"darts_enc_{covariates_temp}_{encoder}_{attribute}"`, where the elements are:
* covariates_temp: "pc" or "fc" for past, or future covariates respectively.
* encoder: the SingleEncoder type used:
* "cyc" (cyclic temporal encoder),
* "dta" (datetime attribute encoder),
* "pos" (positional integer index encoder),
* "cus" (custom callable index encoder)
* attribute: the attribute used for the underlying encoder. Some examples:
* "month_sin", "month_cos" (for "cyc")
* "month" (for "dta")
* "relative" (for "pos")
* "custom" (for "cus")
"""
return f"darts_enc_{self.index_generator.base_component_name}"
def _generate_train_idx(target, steps_ahead_start, steps_ahead_end) -> SupportedIndex:
"""The returned index depends on the following cases:
case 1
(steps_ahead_start >= 0 and steps_ahead_end is None or <= 1)
the complete index is within the target index; always True for all models except RegressionModels.
case 2
steps_ahead_start < 0: add indices before the target start time; only possible for RegressionModels
where the minimum past lag is larger than input_chunk_length.
case 3
steps_ahead_end > 0: add indices after the target end time; only possible for RegressionModels
where the maximum future lag is larger than output_chunk_length.
Parameters
----------
target
the target series.
steps_ahead_start
how many steps ahead of target start time to begin the index.
steps_ahead_end
how many steps ahead of target end time to end the index.
"""
# case 1
if steps_ahead_start >= 0 and (steps_ahead_end is None or steps_ahead_end <= -1):
return target.time_index[steps_ahead_start:steps_ahead_end]
# case 2
idx_start = (
generate_index(
end=target.start_time() - target.freq,
length=abs(steps_ahead_start),
freq=target.freq,
)
if steps_ahead_start < 0
else target.time_index.__class__([])
)
# if `steps_ahead_start >= 0` or `steps_ahead_end <= 0` we must extract a slice of the target series index
center_start = steps_ahead_start if steps_ahead_start >= 0 else None
center_end = (
steps_ahead_end
if steps_ahead_end is not None and steps_ahead_end <= 0
else None
)
idx_center = target.time_index[center_start:center_end]
# case 3
idx_end = (
generate_index(
start=target.end_time() + target.freq,
length=abs(steps_ahead_end),
freq=target.freq,
)
if steps_ahead_end is not None and steps_ahead_end > 0
else target.time_index.__class__([])
)
# concatenate start, center, and end index
# note: pandas' union() returns type pd.Index(), so we construct index directly from index class
return target.time_index.__class__(idx_start.union(idx_center).union(idx_end))