Source code for darts.models.forecasting.ensemble_model

Ensemble Model Base Class

from abc import abstractmethod
from typing import List, Optional, Sequence, Tuple, Union

from darts.logging import get_logger, raise_if, raise_if_not, raise_log
from darts.models.forecasting.forecasting_model import (
from darts.timeseries import TimeSeries, concatenate
from darts.utils.ts_utils import series2seq

logger = get_logger(__name__)

[docs]class EnsembleModel(GlobalForecastingModel): """ Abstract base class for ensemble models. Ensemble models take in a list of forecasting models and ensemble their predictions to make a single one according to the rule defined by their `ensemble()` method. If `future_covariates` or `past_covariates` are provided at training or inference time, they will be passed only to the models supporting them. Parameters ---------- forecasting_models List of forecasting models whose predictions to ensemble .. note:: if all the models are probabilistic, the `EnsembleModel` will also be probabilistic. .. train_num_samples Number of prediction samples from each forecasting model for multi-level ensembles. The n_samples dimension will be reduced using the `train_samples_reduction` method. train_samples_reduction If `forecasting_models` are probabilistic and `train_num_samples` > 1, method used to reduce the samples dimension to 1. Possible values: "mean", "median" or float value corresponding to the desired quantile. retrain_forecasting_models If set to `False`, the `forecasting_models` are not retrained when calling `fit()` (only supported if all the `forecasting_models` are pretrained `GlobalForecastingModels`). Default: ``True``. show_warnings Whether to show warnings related to models covariates support. """ def __init__( self, forecasting_models: List[ForecastingModel], train_num_samples: int, train_samples_reduction: Optional[Union[str, float]], train_forecasting_models: bool = True, show_warnings: bool = True, ): raise_if_not( isinstance(forecasting_models, list) and forecasting_models, "Cannot instantiate EnsembleModel with an empty list of `forecasting_models`", logger, ) is_local_model = [ isinstance(model, LocalForecastingModel) for model in forecasting_models ] is_global_model = [ isinstance(model, GlobalForecastingModel) for model in forecasting_models ] self.is_local_ensemble = all(is_local_model) self.is_global_ensemble = all(is_global_model) raise_if_not( all( [ local_model or global_model for local_model, global_model in zip( is_local_model, is_global_model ) ] ), "All models must be of type `GlobalForecastingModel`, or `LocalForecastingModel`. " "Also, make sure that all `forecasting_models` are instantiated.", logger, ) model_fit_status = [m._fit_called for m in forecasting_models] self.all_trained = all(model_fit_status) some_trained = any(model_fit_status) raise_if( (not self.is_global_ensemble and some_trained) or (self.is_global_ensemble and not (self.all_trained or not some_trained)), "Cannot instantiate EnsembleModel with a mixture of unfitted and fitted `forecasting_models`. " "Consider resetting all models with `my_model.untrained_model()` or using only trained " "GlobalForecastingModels together with `retrain_forecasting_models=False`.", logger, ) if train_forecasting_models: # prevent issues with pytorch-lightning trainer during retraining raise_if( some_trained, "`retrain_forecasting_models=True` but some `forecasting_models` were already fitted. " "Consider resetting all the `forecasting_models` with `my_model.untrained_model()` " "before passing them to the `EnsembleModel`.", logger, ) else: raise_if_not( self.is_global_ensemble and self.all_trained, "`retrain_forecasting_models=False` is supported only if all the `forecasting_models` are " "already trained `GlobalForecastingModels`.", logger, ) raise_if( train_num_samples is not None and train_num_samples > 1 and all( [not m.supports_probabilistic_prediction for m in forecasting_models] ), "`train_num_samples` is greater than 1 but the `RegressionEnsembleModel` " "contains only deterministic `forecasting_models`.", logger, ) supported_reduction = ["mean", "median"] if train_samples_reduction is None: pass elif isinstance(train_samples_reduction, float): raise_if_not( 0.0 < train_samples_reduction < 1.0, f"if a float, `train_samples_reduction` must be between " f"0 and 1, received ({train_samples_reduction})", logger, ) elif isinstance(train_samples_reduction, str): raise_if( train_samples_reduction not in supported_reduction, f"if a string, `train_samples_reduction` must be one of {supported_reduction}, " f"received ({train_samples_reduction})", logger, ) else: raise_log( ValueError( f"`train_samples_reduction` type not supported " f"({train_samples_reduction}). Must be `float` " f" or one of {supported_reduction}." ), logger, ) super().__init__() self.forecasting_models = forecasting_models self.train_num_samples = train_num_samples self.train_samples_reduction = train_samples_reduction self.train_forecasting_models = train_forecasting_models self.show_warnings = show_warnings if show_warnings: if ( self.supports_past_covariates and not self._full_past_covariates_support() ): logger.warning( "Some `forecasting_models` in the ensemble do not support past covariates, the past covariates " "will be provided only to the models supporting them when calling fit()` or `predict()`. " "To hide these warnings, set `show_warnings=False`." ) if ( self.supports_future_covariates and not self._full_future_covariates_support() ): logger.warning( "Some `forecasting_models` in the ensemble do not support future covariates, the future covariates" " will be provided only to the models supporting them when calling `fit()` or `predict()`. " "To hide these warnings, set `show_warnings=False`." )
[docs] @abstractmethod def fit( self, series: Union[TimeSeries, Sequence[TimeSeries]], past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, ): """ Fits the model on the provided series. Note that `` does NOT call `fit()` on each of its constituent forecasting models. It is left to classes inheriting from EnsembleModel to do so appropriately when overriding `fit()` """ is_single_series = isinstance(series, TimeSeries) # local models OR mix of local and global models raise_if( not self.is_global_ensemble and not is_single_series, "The `forecasting_models` contain at least one LocalForecastingModel, which does not support training " "on multiple series.", logger, ) # check that if timeseries is single series, that covariates are as well and vice versa error_past_cov = False error_future_cov = False if past_covariates is not None: error_past_cov = is_single_series != isinstance(past_covariates, TimeSeries) if future_covariates is not None: error_future_cov = is_single_series != isinstance( future_covariates, TimeSeries ) raise_if( error_past_cov or error_future_cov, "Both series and covariates have to be either single TimeSeries or sequences of TimeSeries.", logger, ) self._verify_past_future_covariates(past_covariates, future_covariates) super().fit(series, past_covariates, future_covariates) return self
def _stack_ts_seq(self, predictions): # stacks list of predictions into one multivariate timeseries return concatenate(predictions, axis=1) def _stack_ts_multiseq(self, predictions_list): # stacks multiple sequences of timeseries elementwise return [self._stack_ts_seq(ts_list) for ts_list in zip(*predictions_list)] def _model_encoder_settings(self): raise NotImplementedError( "Encoders are not supported by EnsembleModels. Instead add encoder to the underlying `forecasting_models`." ) def _make_multiple_predictions( self, n: int, series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, num_samples: int = 1, predict_likelihood_parameters: bool = False, ) -> Union[TimeSeries, Sequence[TimeSeries]]: is_single_series = isinstance(series, TimeSeries) or series is None # maximize covariate usage predictions = [ model._predict_wrapper( n=n, series=series, past_covariates=( past_covariates if model.supports_past_covariates else None ), future_covariates=( future_covariates if model.supports_future_covariates else None ), num_samples=( num_samples if model.supports_probabilistic_prediction else 1 ), predict_likelihood_parameters=predict_likelihood_parameters, ) for model in self.forecasting_models ] # reduce the probabilistics series if self.train_samples_reduction is not None and self.train_num_samples > 1: predictions = [ self._predictions_reduction(prediction) for prediction in predictions ] return ( self._stack_ts_seq(predictions) if is_single_series else self._stack_ts_multiseq(predictions) )
[docs] def predict( self, n: int, series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, num_samples: int = 1, verbose: bool = False, predict_likelihood_parameters: bool = False, show_warnings: bool = True, ) -> Union[TimeSeries, Sequence[TimeSeries]]: # ensure forecasting models all rely on the same series during inference if series is None: series = self.training_series if past_covariates is None: past_covariates = self.past_covariate_series if future_covariates is None: future_covariates = self.future_covariate_series super().predict( n=n, series=series, past_covariates=past_covariates, future_covariates=future_covariates, num_samples=num_samples, verbose=verbose, predict_likelihood_parameters=predict_likelihood_parameters, show_warnings=show_warnings, ) # for single-level ensemble, probabilistic forecast is obtained directly from forecasting models if self.train_samples_reduction is None: pred_num_samples = num_samples forecast_models_pred_likelihood_params = predict_likelihood_parameters # for multi-levels ensemble, forecasting models can generate arbitrary number of samples else: pred_num_samples = self.train_num_samples # second layer model (regression) cannot be trained on likelihood parameters forecast_models_pred_likelihood_params = False self._verify_past_future_covariates(past_covariates, future_covariates) predictions = self._make_multiple_predictions( n=n, series=series, past_covariates=past_covariates, future_covariates=future_covariates, num_samples=pred_num_samples, predict_likelihood_parameters=forecast_models_pred_likelihood_params, ) return self.ensemble( predictions, series=series, num_samples=num_samples, predict_likelihood_parameters=predict_likelihood_parameters, )
[docs] @abstractmethod def ensemble( self, predictions: Union[TimeSeries, Sequence[TimeSeries]], series: Union[TimeSeries, Sequence[TimeSeries]], num_samples: int = 1, predict_likelihood_parameters: bool = False, ) -> Union[TimeSeries, Sequence[TimeSeries]]: """ Defines how to ensemble the individual models' predictions to produce a single prediction. Parameters ---------- predictions Individual predictions to ensemble series Sequence of timeseries to predict on. Optional, since it only makes sense for sequences of timeseries - local models retain timeseries for prediction. Returns ------- TimeSeries or Sequence[TimeSeries] The predicted ``TimeSeries`` or sequence of ``TimeSeries`` obtained by ensembling the individual predictions """ pass
def _predictions_reduction( self, predictions: Union[Sequence[TimeSeries], TimeSeries] ) -> Union[TimeSeries, Sequence[TimeSeries]]: """Reduce the sample dimension of the forecasting models predictions""" is_single_series = isinstance(predictions, TimeSeries) predictions = series2seq(predictions) if self.train_samples_reduction == "median": predictions = [pred.median(axis=2) for pred in predictions] elif self.train_samples_reduction == "mean": predictions = [pred.mean(axis=2) for pred in predictions] else: predictions = [ pred.quantile(self.train_samples_reduction) for pred in predictions ] return predictions[0] if is_single_series else predictions @property def min_train_series_length(self) -> int: return max(model.min_train_series_length for model in self.forecasting_models) @property def min_train_samples(self) -> int: return max(model.min_train_samples for model in self.forecasting_models) @property def extreme_lags( self, ) -> Tuple[ Optional[int], Optional[int], Optional[int], Optional[int], Optional[int], Optional[int], int, Optional[int], ]: def find_max_lag_or_none(lag_id, aggregator) -> Optional[int]: max_lag = None for model in self.forecasting_models: curr_lag = model.extreme_lags[lag_id] if max_lag is None: max_lag = curr_lag elif curr_lag is not None: max_lag = aggregator(max_lag, curr_lag) return max_lag lag_aggregators = (min, max, min, max, min, max, max, max) return tuple( find_max_lag_or_none(i, agg) for i, agg in enumerate(lag_aggregators) ) @property def output_chunk_length(self) -> Optional[int]: """Return `None` if none of the forecasting models have a `output_chunk_length`, otherwise return the smallest output_chunk_length. """ tmp = [ m.output_chunk_length for m in self.forecasting_models if m.output_chunk_length is not None ] if len(tmp) == 0: return None else: return min(tmp) @property def _models_are_probabilistic(self) -> bool: return all( [ model.supports_probabilistic_prediction for model in self.forecasting_models ] ) @property def _models_same_likelihood(self) -> bool: """Return `True` if all the `forecasting_models` are probabilistic and fit the same distribution.""" if not self._models_are_probabilistic: return False models_likelihood = set() lkl_same_params = True tmp_quantiles = None for m in self.forecasting_models: # regression model likelihood is a string, torch-based model likelihoods is an object likelihood = getattr(m, "likelihood") is_obj_lkl = not isinstance(likelihood, str) lkl_simplified_name = ( likelihood.simplified_name() if is_obj_lkl else likelihood ) models_likelihood.add(lkl_simplified_name) # check the quantiles if lkl_simplified_name == "quantile": quantiles: List[str] = ( likelihood.quantiles if is_obj_lkl else m.quantiles ) if tmp_quantiles is None: tmp_quantiles = quantiles elif tmp_quantiles != quantiles: lkl_same_params = False return len(models_likelihood) == 1 and lkl_same_params @property def supports_likelihood_parameter_prediction(self) -> bool: """EnsembleModel can predict likelihood parameters if all its forecasting models were fitted with the same likelihood. """ return ( all( [ m.supports_likelihood_parameter_prediction for m in self.forecasting_models ] ) and self._models_same_likelihood ) @property def supports_probabilistic_prediction(self) -> bool: return self._models_are_probabilistic @property def supports_multivariate(self) -> bool: return all([model.supports_multivariate for model in self.forecasting_models]) @property def supports_past_covariates(self) -> bool: return any( [model.supports_past_covariates for model in self.forecasting_models] ) @property def supports_future_covariates(self) -> bool: return any( [model.supports_future_covariates for model in self.forecasting_models] ) @property def supports_optimized_historical_forecasts(self) -> bool: """ Whether the model supports optimized historical forecasts """ return False @property def _supports_non_retrainable_historical_forecasts(self) -> bool: return self.is_global_ensemble def _full_past_covariates_support(self) -> bool: return all( [model.supports_past_covariates for model in self.forecasting_models] ) def _full_future_covariates_support(self) -> bool: return all( [model.supports_future_covariates for model in self.forecasting_models] ) def _verify_past_future_covariates(self, past_covariates, future_covariates): """ Verify that any non-None covariates comply with the model type. """ raise_if( past_covariates is not None and not self.supports_past_covariates, "`past_covariates` were provided to an `EnsembleModel` but none of its " "`forecasting_models` support such covariates.", logger, ) raise_if( future_covariates is not None and not self.supports_future_covariates, "`future_covariates` were provided to an `EnsembleModel` but none of its " "`forecasting_models` support such covariates.", logger, )