Source code for darts.utils.likelihood_models.sklearn

"""
Likelihoods for `RegressionModel`
---------------------------------
"""

from abc import ABC, abstractmethod
from typing import Optional

import numpy as np

from darts.logging import get_logger, raise_log
from darts.utils.likelihood_models.base import (
    Likelihood,
    LikelihoodType,
    quantile_names,
)
from darts.utils.utils import _check_quantiles

logger = get_logger(__name__)


[docs]class SKLearnLikelihood(Likelihood, ABC): def __init__( self, likelihood_type: LikelihoodType, parameter_names: list[str], n_outputs: int, random_state: Optional[int] = None, ): """Base class for sklearn wrapper (e.g. `RegressionModel`) likelihoods. Parameters ---------- likelihood_type A pre-defined `LikelihoodType`. parameter_names The likelihood (distribution) parameter names. n_outputs The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise `output_chunk_length`. random_state Optionally, control the randomness of the sampling. """ self._n_outputs = n_outputs self._rng = np.random.default_rng(seed=random_state) super().__init__( likelihood_type=likelihood_type, parameter_names=parameter_names, ) # ignore additional attrs for equality tests self.ignore_attrs_equality += ["_n_outputs", "_rng"]
[docs] def predict( self, model, x: np.ndarray, num_samples: int, predict_likelihood_parameters: bool, **kwargs, ) -> np.ndarray: """ Generates sampled or direct likelihood parameter predictions. Parameters ---------- model The Darts `RegressionModel`. x The input feature array passed to the underlying estimator's `predict()` method. num_samples Number of times a prediction is sampled from the likelihood model / distribution. If `1` and `predict_likelihood_parameters=False`, returns median / mean predictions. predict_likelihood_parameters If set to `True`, generates likelihood parameter predictions instead of sampling from the likelihood model / distribution. Only supported with `num_samples = 1` and `n<=output_chunk_length`. kwargs Some kwargs passed to the underlying estimator's `predict()` method. """ model_output = self._estimator_predict(model, x=x, **kwargs) if predict_likelihood_parameters: return self.predict_likelihood_parameters(model_output) elif num_samples == 1: return self._get_median_prediction(model_output) else: return self.sample(model_output)
[docs] @abstractmethod def sample(self, model_output: np.ndarray) -> np.ndarray: """ Samples a prediction from the likelihood distribution and the predicted parameters. """
[docs] @abstractmethod def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray: """ Returns the distribution parameters as a array, extracted from the raw model outputs. """
@abstractmethod def _estimator_predict( self, model, x: np.ndarray, **kwargs, ) -> np.ndarray: """ Computes the model output. Parameters ---------- model The Darts `RegressionModel`. x The input feature array passed to the underlying estimator's `predict()` method. kwargs Some kwargs passed to the underlying estimator's `predict()` method. """ @abstractmethod def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray: """ Gets the median prediction per component extracted from the model output. """
[docs]class GaussianLikelihood(SKLearnLikelihood): def __init__( self, n_outputs: int, random_state: Optional[int] = None, ): """ Gaussian distribution [1]_. Parameters ---------- n_outputs The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise `output_chunk_length`. random_state Optionally, control the randomness of the sampling. References ---------- .. [1] https://en.wikipedia.org/wiki/Normal_distribution """ super().__init__( likelihood_type=LikelihoodType.Gaussian, parameter_names=["mu", "sigma"], n_outputs=n_outputs, random_state=random_state, )
[docs] def sample(self, model_output: np.ndarray) -> np.ndarray: # shape (n_components * output_chunk_length, n_series * n_samples, 2) # [mu, sigma] on the last dimension, grouped by component n_entries, n_samples, n_params = model_output.shape # get samples (n_components * output_chunk_length, n_series * n_samples) samples = self._rng.normal( model_output[:, :, 0], # mean model_output[:, :, 1], # variance ) # reshape to (n_series * n_samples, n_components * output_chunk_length) samples = samples.transpose() # reshape to (n_series * n_samples, output_chunk_length, n_components) samples = samples.reshape(n_samples, self._n_outputs, -1) return samples
[docs] def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray: # shape (n_components * output_chunk_length, n_series * n_samples, 2) # [mu, sigma] on the last dimension, grouped by component n_samples = model_output.shape[1] # reshape to (n_series * n_samples, output_chunk_length, n_components) params_reshaped = model_output.transpose(1, 0, 2).reshape( n_samples, self._n_outputs, -1 ) return params_reshaped
def _estimator_predict( self, model, x: np.ndarray, **kwargs, ) -> np.ndarray: # returns samples computed from double-valued inputs [mean, variance]. # `x` is of shape (n_series * n_samples, n_regression_features) # `model_output` is of shape: # - (n_series * n_samples, 2): if univariate & output_chunk_length == 1 # - (2, n_series * n_samples, n_components * output_chunk_length): otherwise # where the axis with 2 dims is mu, sigma model_output = model.model.predict(x, **kwargs) output_dim = len(model_output.shape) # univariate & single-chunk output if output_dim <= 2: # embedding well shaped 2D output into 3D model_output = np.expand_dims(model_output, axis=0) else: # we transpose to get mu, sigma couples on last axis # shape becomes: (n_components * output_chunk_length, n_series * n_samples, 2) model_output = model_output.transpose() # shape (n_components * output_chunk_length, n_series * n_samples, 2) return model_output def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray: # shape (n_components * output_chunk_length, n_series * n_samples, 2) # [mu, sigma] on the last dimension, grouped by component k = model_output.shape[1] # extract mu (mean) per component component_medians = slice(0, None, self.num_parameters) # shape (n_series * n_samples, output_chunk_length, n_components) return model_output[:, :, component_medians].reshape(k, self._n_outputs, -1)
[docs]class PoissonLikelihood(SKLearnLikelihood): def __init__( self, n_outputs: int, random_state: Optional[int] = None, ): """ Poisson distribution [1]_. Parameters ---------- n_outputs The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise `output_chunk_length`. random_state Optionally, control the randomness of the sampling. References ---------- .. [1] https://en.wikipedia.org/wiki/Poisson_distribution """ super().__init__( likelihood_type=LikelihoodType.Poisson, parameter_names=["lambda"], n_outputs=n_outputs, random_state=random_state, )
[docs] def sample(self, model_output: np.ndarray) -> np.ndarray: # shape (n_series * n_samples, output_chunk_length, n_components) return self._rng.poisson(lam=model_output).astype(float)
[docs] def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray: # lambdas on the last dimension, grouped by component return model_output
def _estimator_predict( self, model, x: np.ndarray, **kwargs, ) -> np.ndarray: k = x.shape[0] # returns shape (n_series * n_samples, output_chunk_length, n_components) return model.model.predict(x, **kwargs).reshape(k, self._n_outputs, -1) def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray: # shape (n_series * n_samples, output_chunk_length, n_components) # lambda is already the median prediction return model_output
[docs]class QuantileRegression(SKLearnLikelihood): def __init__( self, n_outputs: int, random_state: Optional[int] = None, quantiles: Optional[list[float]] = None, ): """ Quantile Regression [1]_. Parameters ---------- n_outputs The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise `output_chunk_length`. random_state Optionally, control the randomness of the sampling. quantiles A list of quantiles. Default: `[0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]`. References ---------- .. [1] https://en.wikipedia.org/wiki/Quantile_regression """ if quantiles is None: quantiles = [ 0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, ] else: quantiles = sorted(quantiles) _check_quantiles(quantiles) self.quantiles = quantiles self._median_idx = quantiles.index(0.5) super().__init__( likelihood_type=LikelihoodType.Quantile, parameter_names=quantile_names(self.quantiles), n_outputs=n_outputs, random_state=random_state, ) self.ignore_attrs_equality += ["_median_idx"]
[docs] def sample(self, model_output: np.ndarray) -> np.ndarray: # model_output is of shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles) # sample uniformly between [0, 1] (for each batch example) and return the # linear interpolation between the fitted quantiles closest to the sampled value. k, n_times, n_components, n_quantiles = model_output.shape # obtain samples probs = self._rng.uniform( size=( k, n_times, n_components, 1, ) ) # add dummy dim probas = np.expand_dims(probs, axis=-2) # tile and transpose p = np.tile(probas, (1, 1, 1, n_quantiles, 1)).transpose((0, 1, 2, 4, 3)) # prepare quantiles tquantiles = np.array(self.quantiles).reshape((1, 1, 1, -1)) # calculate index of the largest quantile smaller than the sampled value left_idx = np.sum(p > tquantiles, axis=-1) # obtain index of the smallest quantile larger than the sampled value right_idx = left_idx + 1 # repeat the model output on the edges repeat_count = [1] * n_quantiles repeat_count[0] = 2 repeat_count[-1] = 2 repeat_count = np.array(repeat_count) shifted_output = np.repeat(model_output, repeat_count, axis=-1) # obtain model output values corresponding to the quantiles left and right of the sampled value left_value = np.take_along_axis(shifted_output, left_idx, axis=-1) right_value = np.take_along_axis(shifted_output, right_idx, axis=-1) # add 0 and 1 to quantiles ext_quantiles = [0.0] + self.quantiles + [1.0] expanded_q = np.tile(np.array(ext_quantiles), left_idx.shape) # calculate closest quantiles to the sampled value left_q = np.take_along_axis(expanded_q, left_idx, axis=-1) right_q = np.take_along_axis(expanded_q, right_idx, axis=-1) # linear interpolation weights = (probs - left_q) / (right_q - left_q) inter = left_value + weights * (right_value - left_value) # shape (n_series * n_samples, output_chunk_length, n_components * n_quantiles) return inter.squeeze(-1)
[docs] def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray: # shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles) # quantiles on the last dimension, grouped by component k, n_times, n_components, n_quantiles = model_output.shape # last dim : [comp_1_q_1, ..., comp_1_q_n, ..., comp_n_q_1, ..., comp_n_q_n] # shape (n_series * n_samples, output_chunk_length, n_components * n_quantiles) return model_output.reshape(k, n_times, n_components * n_quantiles)
def _estimator_predict( self, model, x: np.ndarray, **kwargs, ) -> np.ndarray: # `x` is of shape (n_series * n_samples, n_regression_features) k = x.shape[0] model_outputs = [] for quantile, fitted in model._model_container.items(): model.model = fitted # model output has shape (n_series * n_samples, output_chunk_length, n_components) model_output = fitted.predict(x, **kwargs).reshape(k, self._n_outputs, -1) model_outputs.append(model_output) model_outputs = np.stack(model_outputs, axis=-1) # shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles) return model_outputs def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray: # shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles) k, n_times, n_components, n_quantiles = model_output.shape # extract the median quantiles per component component_medians = slice(self._median_idx, None, n_quantiles) # shape (n_series * n_samples, output_chunk_length, n_components) return model_output[:, :, :, component_medians].reshape( k, n_times, n_components )
def _check_likelihood(likelihood: str, available_likelihoods: list[str]): """Check whether the likelihood is supported. Parameters ---------- likelihood The likelihood name. Must be one of ('gaussian', 'poisson', 'quantile'). available_likelihoods A list of supported likelihood names. """ if likelihood not in available_likelihoods: raise_log( ValueError( f"Invalid `likelihood='{likelihood}'`. Must be one of {available_likelihoods}" ), logger=logger, ) def _get_likelihood( likelihood: Optional[str], n_outputs: int, random_state: Optional[int], quantiles: Optional[list[float]], ) -> Optional[SKLearnLikelihood]: """Get the `Likelihood` object for `RegressionModel`. Parameters ---------- likelihood The likelihood name. Must be one of ('gaussian', 'poisson', 'quantile'). n_outputs The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise `output_chunk_length`. random_state Optionally, control the randomness of the sampling. quantiles Optionally, a list of quantiles. Only effective for `likelihood='quantile'`. """ if likelihood is None: return None elif likelihood == "gaussian": return GaussianLikelihood(n_outputs=n_outputs, random_state=random_state) elif likelihood == "poisson": return PoissonLikelihood(n_outputs=n_outputs, random_state=random_state) elif likelihood == "quantile": return QuantileRegression( n_outputs=n_outputs, random_state=random_state, quantiles=quantiles ) else: raise_log( ValueError( f"Invalid `likelihood='{likelihood}'`. Must be one of ('gaussian', 'poisson', 'quantile')" ), logger=logger, )