"""
Regression ensemble model
-------------------------
An ensemble model which uses a regression model to compute the ensemble forecast.
"""
from typing import List, Optional, Sequence, Tuple, Union
from darts.logging import get_logger, raise_if, raise_if_not
from darts.models.forecasting.ensemble_model import EnsembleModel
from darts.models.forecasting.forecasting_model import ForecastingModel
from darts.models.forecasting.linear_regression_model import LinearRegressionModel
from darts.models.forecasting.regression_model import RegressionModel
from darts.timeseries import TimeSeries, concatenate
from darts.utils.ts_utils import seq2series, series2seq
logger = get_logger(__name__)
[docs]class RegressionEnsembleModel(EnsembleModel):
def __init__(
self,
forecasting_models: List[ForecastingModel],
regression_train_n_points: int,
regression_model=None,
regression_train_num_samples: int = 1,
regression_train_samples_reduction: Optional[Union[str, float]] = "median",
train_forecasting_models: bool = True,
train_using_historical_forecasts: bool = False,
show_warnings: bool = True,
):
"""
Use a regression model for ensembling individual models' predictions using the stacking technique [1]_.
The provided regression model must implement ``fit()`` and ``predict()`` methods
(e.g. scikit-learn regression models). Note that here the regression model is used to learn how to
best ensemble the individual forecasting models' forecasts. It is not the same usage of regression
as in :class:`RegressionModel`, where the regression model is used to produce forecasts based on the
lagged series.
If `future_covariates` or `past_covariates` are provided at training or inference time,
they will be passed only to the forecasting models supporting them.
If `forecasting_models` contains exclusively GlobalForecastingModels, they can be pre-trained. Otherwise,
the `forecasting_models` must be untrained.
The regression model does not leverage the covariates passed to ``fit()`` and ``predict()``.
Parameters
----------
forecasting_models
List of forecasting models whose predictions to ensemble
regression_train_n_points
The number of points per series to use to train the regression model. Can be set to `-1` to use the
entire series to train the regressor if `forecasting_models` are already fitted and
`train_forecasting_models=False`.
regression_model
Any regression model with ``predict()`` and ``fit()`` methods (e.g. from scikit-learn)
Default: ``darts.models.LinearRegressionModel(fit_intercept=False)``
.. note::
if `regression_model` is probabilistic, the `RegressionEnsembleModel` will also be probabilistic.
..
regression_train_num_samples
Number of prediction samples from each forecasting model to train the regression model (samples are
averaged). Should be set to 1 for deterministic models. Default: 1.
.. note::
if `forecasting_models` contains a mix of probabilistic and deterministic models,
`regression_train_num_samples will be passed only to the probabilistic ones.
..
regression_train_samples_reduction
If `forecasting_models` are probabilistic and `regression_train_num_samples` > 1, method used to
reduce the samples before passing them to the regression model. Possible values: "mean", "median"
or float value corresponding to the desired quantile. Default: "median"
train_forecasting_models
If set to `False`, the `forecasting_models` are not retrained when calling `fit()` (only supported
if all the `forecasting_models` are pretrained `GlobalForecastingModels`). Default: ``True``.
train_using_historical_forecasts
If set to `True`, use `historical_forecasts()` to generate the forecasting models' predictions used to
train the regression model in `fit()`. Available when `forecasting_models` contains only
`GlobalForecastingModels`. Recommended when `regression_train_n_points` is greater than
`output_chunk_length` of the underlying `forecasting_models`.
Default: ``False``.
show_warnings
Whether to show warnings related to forecasting_models covariates support.
References
----------
.. [1] D. H. Wolpert, “Stacked generalization”, Neural Networks, vol. 5, no. 2, pp. 241–259, Jan. 1992
Examples
--------
>>> from darts.datasets import AirPassengersDataset
>>> from darts.models import RegressionEnsembleModel, NaiveSeasonal, LinearRegressionModel
>>> series = AirPassengersDataset().load()
>>> model = RegressionEnsembleModel(
>>> forecasting_models = [
>>> NaiveSeasonal(K=12),
>>> LinearRegressionModel(lags=4)
>>> ],
>>> regression_train_n_points=20
>>> )
>>> model.fit(series)
>>> pred = model.predict(6)
>>> pred.values()
array([[494.24050364],
[464.3869697 ],
[496.53180506],
[544.82269341],
[557.35256055],
[630.24334385]])
"""
super().__init__(
forecasting_models=forecasting_models,
train_num_samples=regression_train_num_samples,
train_samples_reduction=regression_train_samples_reduction,
train_forecasting_models=train_forecasting_models,
show_warnings=show_warnings,
)
if regression_model is None:
regression_model = LinearRegressionModel(
lags=None, lags_future_covariates=[0], fit_intercept=False
)
elif isinstance(regression_model, RegressionModel):
raise_if_not(
regression_model.multi_models,
"Cannot use `regression_model` that was created with `multi_models = False`.",
logger,
)
regression_model = regression_model
else:
# scikit-learn like model
regression_model = RegressionModel(
lags_future_covariates=[0], model=regression_model
)
# check lags of the regression model
raise_if_not(
regression_model.lags == {"future": [0]},
f"`lags` and `lags_past_covariates` of regression model must be `None`"
f"and `lags_future_covariates` must be [0]. Given:\n"
f"{regression_model.lags}",
)
self.regression_model: RegressionModel = regression_model
raise_if(
regression_train_n_points == -1
and not (self.all_trained and (not train_forecasting_models)),
"`regression_train_n_points` can only be `-1` if `retrain_forecasting_model=False` and "
"all `forecasting_models` are already fitted.",
logger,
)
# converted to List[int] if regression_train_n_points=-1 and ensemble is trained with multiple series
self.train_n_points: Union[int, List[int]] = regression_train_n_points
raise_if(
train_using_historical_forecasts and not self.is_global_ensemble,
"`train_using_historical_forecasts=True` is only available when all "
"`forecasting_models` are global models.",
logger,
)
self.train_using_historical_forecasts = train_using_historical_forecasts
def _split_multi_ts_sequence(
self, n: Union[int, List[int]], ts_sequence: Sequence[TimeSeries]
) -> Tuple[Sequence[TimeSeries], Sequence[TimeSeries]]:
if isinstance(n, int):
n = [n] * len(ts_sequence)
left = [ts[:-n_] for ts, n_ in zip(ts_sequence, n)]
right = [ts[-n_:] for ts, n_ in zip(ts_sequence, n)]
return left, right
def _make_multiple_historical_forecasts(
self,
train_n_points: int,
series: Union[TimeSeries, Sequence[TimeSeries]],
direct_predictions: Union[TimeSeries, Sequence[TimeSeries]],
past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
num_samples: int = 1,
) -> Union[TimeSeries, Sequence[TimeSeries]]:
"""
For GlobalForecastingModel, when predicting n > output_chunk_length, `historical_forecasts()` generally
produce better forecasts than `predict()`.
To get as close as possible to the predictions generated by the forecasting models during inference,
`historical_forecasts` forecast horizon is equal to each model output_chunk_length.
train_n_points are generated, starting from the end of the series.
"""
is_single_series = isinstance(series, TimeSeries)
series = series2seq(series)
direct_predictions = series2seq(direct_predictions)
past_covariates = series2seq(past_covariates)
future_covariates = series2seq(future_covariates)
n_components = series[0].n_components
model_predict_cols = direct_predictions[0].columns.tolist()
predictions = []
for m_idx, model in enumerate(self.forecasting_models):
# we start historical fc at multiple of the output length before the end.
n_ocl_back = train_n_points // model.output_chunk_length
start_hist_forecasts = n_ocl_back * model.output_chunk_length
# we use the precomputed `direct_prediction` to fill any missing prediction
# timesteps at the beginning (if train_n_points is not perfectly divisible by output length)
missing_steps = train_n_points % model.output_chunk_length
tmp_pred = model.historical_forecasts(
series=series,
past_covariates=(
past_covariates if model.supports_past_covariates else None
),
future_covariates=(
future_covariates if model.supports_future_covariates else None
),
forecast_horizon=model.output_chunk_length,
stride=model.output_chunk_length,
num_samples=(
num_samples if model.supports_probabilistic_prediction else 1
),
start=-start_hist_forecasts,
start_format="position",
retrain=False,
overlap_end=False,
last_points_only=False,
show_warnings=self.show_warnings,
predict_likelihood_parameters=False,
)
# concatenate the strided predictions of output_chunk_length values each
tmp_pred = [concatenate(sub_pred, axis=0) for sub_pred in tmp_pred]
# add the missing steps at beginning by taking the first values of precomputed predictions
if missing_steps:
# add the missing steps at beginning by taking the first values of precomputed predictions
# get the model's direct (uni/multivariate) predictions
pred_cols = model_predict_cols[
m_idx * n_components : (m_idx + 1) * n_components
]
hfc_cols = tmp_pred[0].columns.tolist()
tmp_pred = [
concatenate(
[
preds_dir[:missing_steps][pred_cols].with_columns_renamed(
pred_cols, hfc_cols
),
preds_hfc,
],
axis=0,
)
for preds_dir, preds_hfc in zip(direct_predictions, tmp_pred)
]
predictions.append(tmp_pred)
tmp_predictions = []
# slice the forecasts, training series-wise, to align them
for prediction in predictions:
tmp_predictions.append([ts for idx, ts in enumerate(prediction)])
predictions = [seq2series(prediction) for prediction in tmp_predictions]
# reduce the probabilistics series
if self.train_samples_reduction is not None and self.train_num_samples > 1:
predictions = [
self._predictions_reduction(prediction) for prediction in predictions
]
return (
self._stack_ts_seq(predictions)
if is_single_series
else self._stack_ts_multiseq(predictions)
)
[docs] def fit(
self,
series: Union[TimeSeries, Sequence[TimeSeries]],
past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
):
"""
Fits the forecasting models with the entire series except the last `regression_train_n_points` values, which
are used to train the regression model.
If `forecasting_models` contains fitted `GlobalForecastingModels` and `train_forecasting_model=False`,
only the regression model will be trained.
Parameters
----------
series
TimeSeries or Sequence[TimeSeries] object containing the target values.
past_covariates
Optionally, a series or sequence of series specifying past-observed covariates passed to the
forecasting models
future_covariates
Optionally, a series or sequence of series specifying future-known covariates passed to the
forecasting models
"""
super().fit(
series, past_covariates=past_covariates, future_covariates=future_covariates
)
# spare train_n_points points to serve as regression target
is_single_series = isinstance(series, TimeSeries)
if self.train_n_points == -1:
if is_single_series:
train_n_points = [len(series)]
else:
# maximize each series usage
train_n_points = [len(ts) for ts in series]
# shift by the forecasting models' largest input length
all_shifts = []
# when it's not clearly defined, extreme_lags returns
# `min_train_series_length` for the LocalForecastingModels
for model in self.forecasting_models:
min_target_lag, _, _, _, _, _, _, _ = model.extreme_lags
if min_target_lag is not None:
all_shifts.append(-min_target_lag)
input_shift = max(all_shifts)
idx_series_too_short = []
tmp_train_n_points = []
for idx, ts_length in enumerate(train_n_points):
ajusted_length = ts_length - input_shift
if ajusted_length < 0:
idx_series_too_short.append(idx)
else:
tmp_train_n_points.append(ajusted_length)
raise_if(
len(idx_series_too_short) > 0,
f"TimeSeries at indexes {idx_series_too_short} of `series` are too short to train the regression "
f"model due to the number of values necessary to produce one prediction : {input_shift}.",
logger,
)
if is_single_series:
self.train_n_points = tmp_train_n_points[0]
else:
self.train_n_points = tmp_train_n_points
train_n_points_too_big = False
else:
# self.train_n_points is necessarily an integer
if is_single_series:
train_n_points_too_big = len(series) <= self.train_n_points
else:
train_n_points_too_big = any(
[len(s) <= self.train_n_points for s in series]
)
raise_if(
train_n_points_too_big,
"`regression_train_n_points` parameter too big (must be strictly smaller than "
"the number of points in training_series)",
logger,
)
if is_single_series:
forecast_training = series[: -self.train_n_points]
regression_target = series[-self.train_n_points :]
else:
forecast_training, regression_target = self._split_multi_ts_sequence(
self.train_n_points, series
)
if self.train_forecasting_models:
for model in self.forecasting_models:
# maximize covariate usage
model._fit_wrapper(
series=forecast_training,
past_covariates=(
past_covariates if model.supports_past_covariates else None
),
future_covariates=(
future_covariates if model.supports_future_covariates else None
),
)
# we can call direct prediction in any case. Even if we overwrite with historical
# forecasts later on, it serves as a input validation
predictions = self._make_multiple_predictions(
n=self.train_n_points,
series=forecast_training,
past_covariates=past_covariates,
future_covariates=future_covariates,
num_samples=self.train_num_samples,
)
if self.train_using_historical_forecasts:
predictions = self._make_multiple_historical_forecasts(
train_n_points=self.train_n_points,
series=series,
direct_predictions=predictions,
past_covariates=past_covariates,
future_covariates=future_covariates,
num_samples=self.train_num_samples,
)
# train the regression model on the individual models' predictions
self.regression_model.fit(
series=regression_target, future_covariates=predictions
)
# prepare the forecasting models for further predicting by fitting them with the entire data
if self.train_forecasting_models:
# Some models may need to be 'reset' to allow being retrained from scratch, especially torch-based models
self.forecasting_models: List[ForecastingModel] = [
model.untrained_model() for model in self.forecasting_models
]
for model in self.forecasting_models:
model._fit_wrapper(
series=series,
past_covariates=(
past_covariates if model.supports_past_covariates else None
),
future_covariates=(
future_covariates if model.supports_future_covariates else None
),
)
return self
[docs] def ensemble(
self,
predictions: Union[TimeSeries, Sequence[TimeSeries]],
series: Union[TimeSeries, Sequence[TimeSeries]],
num_samples: int = 1,
predict_likelihood_parameters: bool = False,
) -> Union[TimeSeries, Sequence[TimeSeries]]:
is_single_series = isinstance(series, TimeSeries) or series is None
predictions = series2seq(predictions)
series = series2seq(series) if series is not None else [None]
ensembled = [
self.regression_model.predict(
n=len(prediction),
series=serie,
future_covariates=prediction,
num_samples=num_samples,
predict_likelihood_parameters=predict_likelihood_parameters,
)
for serie, prediction in zip(series, predictions)
]
return seq2series(ensembled) if is_single_series else ensembled
@property
def extreme_lags(
self,
) -> Tuple[
Optional[int],
Optional[int],
Optional[int],
Optional[int],
Optional[int],
Optional[int],
int,
Optional[int],
]:
extreme_lags_ = super().extreme_lags
# shift min_target_lag in the past to account for the regression model training set
if extreme_lags_[0] is None:
return (-self.train_n_points,) + extreme_lags_[1:]
else:
return (extreme_lags_[0] - self.train_n_points,) + extreme_lags_[1:]
@property
def output_chunk_length(self) -> int:
"""Return the `output_chunk_length` of the regression model (ensembling layer)"""
return self.regression_model.output_chunk_length
@property
def supports_likelihood_parameter_prediction(self) -> bool:
"""RegressionEnsembleModel supports likelihood parameters predictions if its regression model does"""
return self.regression_model.supports_likelihood_parameter_prediction
@property
def supports_multivariate(self) -> bool:
return (
super().supports_multivariate
and self.regression_model.supports_multivariate
)
@property
def supports_probabilistic_prediction(self) -> bool:
"""
A RegressionEnsembleModel is probabilistic if its regression
model is probabilistic (ensembling layer)
"""
return self.regression_model.supports_probabilistic_prediction