"""
Likelihoods for SKLearnModel
----------------------------
"""
from abc import ABC, abstractmethod
from collections.abc import Sequence
import numpy as np
from darts import TimeSeries
from darts.logging import get_logger, raise_log
from darts.utils.likelihood_models.base import (
Likelihood,
LikelihoodType,
quantile_names,
)
from darts.utils.utils import _check_quantiles
logger = get_logger(__name__)
[docs]
class SKLearnLikelihood(Likelihood, ABC):
def __init__(
self,
likelihood_type: LikelihoodType,
parameter_names: list[str],
n_outputs: int,
):
"""Base class for sklearn wrapper (e.g. `SKLearnModel`) likelihoods.
Parameters
----------
likelihood_type
A pre-defined `LikelihoodType`.
parameter_names
The likelihood (distribution) parameter names.
n_outputs
The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise
`output_chunk_length`.
"""
self._n_outputs = n_outputs
super().__init__(
likelihood_type=likelihood_type,
parameter_names=parameter_names,
)
# ignore additional attrs for equality tests
self.ignore_attrs_equality += ["_n_outputs"]
[docs]
def predict(
self,
model,
x: np.ndarray,
num_samples: int,
predict_likelihood_parameters: bool,
**kwargs,
) -> np.ndarray:
"""
Generates sampled or direct likelihood parameter predictions.
Parameters
----------
model
The Darts `SKLearnModel`.
x
The input feature array passed to the underlying estimator's `predict()` method.
num_samples
Number of times a prediction is sampled from the likelihood model / distribution.
If `1` and `predict_likelihood_parameters=False`, returns median / mean predictions.
predict_likelihood_parameters
If set to `True`, generates likelihood parameter predictions instead of sampling from the
likelihood model / distribution. Only supported with `num_samples = 1` and
`n<=output_chunk_length`.
kwargs
Some kwargs passed to the underlying estimator's `predict()` method.
"""
model_output = self._estimator_predict(model, x=x, **kwargs)
if predict_likelihood_parameters:
return self.predict_likelihood_parameters(model_output)
elif num_samples == 1:
return self._get_median_prediction(model_output)
else:
return self.sample(model_output)
[docs]
@abstractmethod
def sample(self, model_output: np.ndarray) -> np.ndarray:
"""
Samples a prediction from the likelihood distribution and the predicted parameters.
"""
[docs]
@abstractmethod
def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray:
"""
Returns the distribution parameters as a array, extracted from the raw model outputs.
"""
@abstractmethod
def _estimator_predict(
self,
model,
x: np.ndarray,
**kwargs,
) -> np.ndarray:
"""
Computes the model output.
Parameters
----------
model
The Darts `SKLearnModel`.
x
The input feature array passed to the underlying estimator's `predict()` method.
kwargs
Some kwargs passed to the underlying estimator's `predict()` method.
"""
@abstractmethod
def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray:
"""
Gets the median prediction per component extracted from the model output.
"""
[docs]
class GaussianLikelihood(SKLearnLikelihood):
def __init__(self, n_outputs: int):
"""
Gaussian distribution [1]_.
Parameters
----------
n_outputs
The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise
`output_chunk_length`.
References
----------
.. [1] https://en.wikipedia.org/wiki/Normal_distribution
"""
super().__init__(
likelihood_type=LikelihoodType.Gaussian,
parameter_names=["mu", "sigma"],
n_outputs=n_outputs,
)
[docs]
def sample(self, model_output: np.ndarray) -> np.ndarray:
# shape (n_components * output_chunk_length, n_series * n_samples, 2)
# [mu, sigma] on the last dimension, grouped by component
n_entries, n_samples, n_params = model_output.shape
# get samples (n_components * output_chunk_length, n_series * n_samples)
samples = np.random.normal(
model_output[:, :, 0], # mean
model_output[:, :, 1], # variance
)
# reshape to (n_series * n_samples, n_components * output_chunk_length)
samples = samples.transpose()
# reshape to (n_series * n_samples, output_chunk_length, n_components)
samples = samples.reshape(n_samples, self._n_outputs, -1)
return samples
[docs]
def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray:
# shape (n_components * output_chunk_length, n_series * n_samples, 2)
# [mu, sigma] on the last dimension, grouped by component
n_samples = model_output.shape[1]
# reshape to (n_series * n_samples, output_chunk_length, n_components)
params_reshaped = model_output.transpose(1, 0, 2).reshape(
n_samples, self._n_outputs, -1
)
return params_reshaped
def _estimator_predict(
self,
model,
x: np.ndarray,
**kwargs,
) -> np.ndarray:
# returns samples computed from double-valued inputs [mean, variance].
# `x` is of shape (n_series * n_samples, n_regression_features)
# `model_output` is of shape:
# - (n_series * n_samples, 2): if univariate & output_chunk_length == 1
# - (2, n_series * n_samples, n_components * output_chunk_length): otherwise
# where the axis with 2 dims is mu, sigma
model_output = model.model.predict(x, **kwargs)
output_dim = len(model_output.shape)
# univariate & single-chunk output
if output_dim <= 2:
# embedding well shaped 2D output into 3D
model_output = np.expand_dims(model_output, axis=0)
else:
# we transpose to get mu, sigma couples on last axis
# shape becomes: (n_components * output_chunk_length, n_series * n_samples, 2)
model_output = model_output.transpose()
# shape (n_components * output_chunk_length, n_series * n_samples, 2)
return model_output
def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray:
# shape (n_components * output_chunk_length, n_series * n_samples, 2)
# [mu, sigma] on the last dimension, grouped by component
k = model_output.shape[1]
# extract mu (mean) per component
component_medians = slice(0, None, self.num_parameters)
# shape (n_series * n_samples, output_chunk_length, n_components)
return model_output[:, :, component_medians].reshape(k, self._n_outputs, -1)
[docs]
class PoissonLikelihood(SKLearnLikelihood):
def __init__(self, n_outputs: int):
"""
Poisson distribution [1]_.
Parameters
----------
n_outputs
The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise
`output_chunk_length`.
References
----------
.. [1] https://en.wikipedia.org/wiki/Poisson_distribution
"""
super().__init__(
likelihood_type=LikelihoodType.Poisson,
parameter_names=["lambda"],
n_outputs=n_outputs,
)
[docs]
def sample(self, model_output: np.ndarray) -> np.ndarray:
# shape (n_series * n_samples, output_chunk_length, n_components)
return np.random.poisson(lam=model_output).astype(float)
[docs]
def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray:
# lambdas on the last dimension, grouped by component
return model_output
def _estimator_predict(
self,
model,
x: np.ndarray,
**kwargs,
) -> np.ndarray:
k = x.shape[0]
# returns shape (n_series * n_samples, output_chunk_length, n_components)
return model.model.predict(x, **kwargs).reshape(k, self._n_outputs, -1)
def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray:
# shape (n_series * n_samples, output_chunk_length, n_components)
# lambda is already the median prediction
return model_output
[docs]
class QuantileRegression(SKLearnLikelihood):
def __init__(
self,
n_outputs: int,
quantiles: list[float] | None = None,
):
"""
Quantile Regression [1]_.
Parameters
----------
n_outputs
The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise
`output_chunk_length`.
quantiles
A list of quantiles. Default: `[0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]`.
References
----------
.. [1] https://en.wikipedia.org/wiki/Quantile_regression
"""
if quantiles is None:
quantiles = [
0.01,
0.05,
0.1,
0.25,
0.5,
0.75,
0.9,
0.95,
0.99,
]
else:
quantiles = sorted(quantiles)
_check_quantiles(quantiles)
self.quantiles = quantiles
self._median_idx = quantiles.index(0.5)
super().__init__(
likelihood_type=LikelihoodType.Quantile,
parameter_names=quantile_names(self.quantiles),
n_outputs=n_outputs,
)
self.ignore_attrs_equality += ["_median_idx"]
[docs]
def sample(self, model_output: np.ndarray) -> np.ndarray:
# model_output is of shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles)
# sample uniformly between [0, 1] (for each batch example) and return the
# linear interpolation between the fitted quantiles closest to the sampled value.
k, n_times, n_components, n_quantiles = model_output.shape
# obtain samples
probs = np.random.uniform(
size=(
k,
n_times,
n_components,
1,
)
)
# add dummy dim
probas = np.expand_dims(probs, axis=-2)
# tile and transpose
p = np.tile(probas, (1, 1, 1, n_quantiles, 1)).transpose((0, 1, 2, 4, 3))
# prepare quantiles
tquantiles = np.array(self.quantiles).reshape((1, 1, 1, -1))
# calculate index of the largest quantile smaller than the sampled value
left_idx = np.sum(p > tquantiles, axis=-1)
# obtain index of the smallest quantile larger than the sampled value
right_idx = left_idx + 1
# repeat the model output on the edges
repeat_count = [1] * n_quantiles
repeat_count[0] = 2
repeat_count[-1] = 2
repeat_count = np.array(repeat_count)
shifted_output = np.repeat(model_output, repeat_count, axis=-1)
# obtain model output values corresponding to the quantiles left and right of the sampled value
left_value = np.take_along_axis(shifted_output, left_idx, axis=-1)
right_value = np.take_along_axis(shifted_output, right_idx, axis=-1)
# add 0 and 1 to quantiles
ext_quantiles = [0.0] + self.quantiles + [1.0]
expanded_q = np.tile(np.array(ext_quantiles), left_idx.shape)
# calculate closest quantiles to the sampled value
left_q = np.take_along_axis(expanded_q, left_idx, axis=-1)
right_q = np.take_along_axis(expanded_q, right_idx, axis=-1)
# linear interpolation
weights = (probs - left_q) / (right_q - left_q)
inter = left_value + weights * (right_value - left_value)
# shape (n_series * n_samples, output_chunk_length, n_components * n_quantiles)
return inter.squeeze(-1)
[docs]
def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray:
# shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles)
# quantiles on the last dimension, grouped by component
k, n_times, n_components, n_quantiles = model_output.shape
# last dim : [comp_1_q_1, ..., comp_1_q_n, ..., comp_n_q_1, ..., comp_n_q_n]
# shape (n_series * n_samples, output_chunk_length, n_components * n_quantiles)
return model_output.reshape(k, n_times, n_components * n_quantiles)
def _estimator_predict(
self,
model,
x: np.ndarray,
**kwargs,
) -> np.ndarray:
# `x` is of shape (n_series * n_samples, n_regression_features)
k = x.shape[0]
model_outputs = []
for quantile, fitted in model._model_container.items():
# model output has shape (n_series * n_samples, output_chunk_length, n_components)
model_output = fitted.predict(x, **kwargs).reshape(k, self._n_outputs, -1)
model_outputs.append(model_output)
model_outputs = np.stack(model_outputs, axis=-1)
# shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles)
return model_outputs
def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray:
# shape (n_series * n_samples, output_chunk_length, n_components, n_quantiles)
k, n_times, n_components, n_quantiles = model_output.shape
# extract the median quantiles per component
component_medians = slice(self._median_idx, None, n_quantiles)
# shape (n_series * n_samples, output_chunk_length, n_components)
return model_output[:, :, :, component_medians].reshape(
k, n_times, n_components
)
[docs]
class ClassProbabilityLikelihood(SKLearnLikelihood):
def __init__(self, n_outputs: int):
"""
Class probability likelihood.
Likelihood to predict the probability of each class for a forecasting classification task.
Parameters
----------
n_outputs
The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise
`output_chunk_length`.
"""
super().__init__(
likelihood_type=LikelihoodType.ClassProbability,
n_outputs=n_outputs,
parameter_names=[],
)
self._index_first_param_per_component: np.ndarray | None = None
self._classes: list[np.ndarray] | None = None
[docs]
def fit(self, model):
"""
Fits the likelihood to the model.
Parameters
----------
model
The model to fit the likelihood to. The model is expected to be fitted and have a `class_labels` attribute.
"""
if not hasattr(model, "class_labels"):
raise_log(
ValueError(
"The model must have a `class_labels` attribute to fit the likelihood."
),
logger,
)
self._classes = model.class_labels
# estimators/classes are ordered by chunk then by component: [classes_comp0_chunk0, classes_comp1_chunk0, ...]
# Since classes are same across chunk for same component, we can just take the first chunk for each component
num_components = len(self._classes) // self._n_outputs
# index of the first parameter of each component in the likelihood parameters
# e.g. [0, 2, 4, 6] for 3 components with 2 parameters each
self._index_first_param_per_component = np.insert(
np.cumsum([
len(estimator_classes)
for estimator_classes in self._classes[:num_components]
]),
0,
0,
)
self._parameter_names = [
f"p{int(label)}"
for i in range(num_components)
for label in self._classes[i]
]
return self
[docs]
def component_names(
self,
series: TimeSeries | None = None,
components: Sequence | None = None,
) -> list[str]:
"""Generates names for the parameters of the Likelihood."""
if self._index_first_param_per_component is None:
raise_log(ValueError("The likelihood has not been fitted yet."))
if (series is not None) == (components is not None):
raise_log(
ValueError("Only one of `series` or `components` must be specified."),
logger=logger,
)
if series is not None:
components = series.components
# format: <component_name>_p<label>
return [
f"{component_name}_{parameter_name}"
for i, component_name in enumerate(components)
for parameter_name in self.parameter_names[
self._index_first_param_per_component[
i
] : self._index_first_param_per_component[i + 1]
]
]
def _estimator_predict(
self,
model,
x: np.ndarray,
**kwargs,
) -> np.ndarray:
# list of length n_components * output_chunk_length of numpy arrays of shape (n_samples*n_series, n_classes)
model_output = model.model.predict_proba(x, **kwargs)
if not isinstance(model_output, list):
model_output = [model_output]
# shape: (output_chunk_length, n_series * n_samples, n_likelihood_parameters)
# n_likelihood_parameters is the sum of the number of classes for each component
class_proba = np.zeros((
self._n_outputs,
model_output[0].shape[0],
self.num_parameters,
))
# ordered first by _n_outputs then by component: [comp0_chunk0, comp1_chunk0, ..]
n_component = len(model_output) // self._n_outputs
# filling the class_proba array with the predicted probabilities
for i, params_proba in enumerate(model_output):
component_index = i % n_component
output_index = i // n_component
class_proba[
output_index,
:,
self._index_first_param_per_component[
component_index
] : self._index_first_param_per_component[component_index + 1],
] = params_proba
# shape (output_chunk_length, n_series * n_samples, n_likelihood_parameters)
# n_likelihood_parameters is the sum of the number of classes for each component
# n_likelihood_parameters are ordered as in component_names:
# [comp0_l1, comp0_l2, comp1_l1, comp1_l2, comp1_l3,...]
# should be accessed through _index_first_param_per_component (for comp1 => 2)
return class_proba
[docs]
def sample(self, model_output: np.ndarray) -> np.ndarray:
"""
Samples a prediction from the likelihood distribution and the predicted parameters.
"""
# shape (output_chunk_length, n_series * n_samples, n_likelihood_parameters)
n_output, n_samples, _ = model_output.shape
n_component = len(self._classes) // self._n_outputs
# shape (output_chunk_length, n_series * n_samples, n_components)
preds = np.empty((n_output, n_samples, n_component), dtype=int)
# Some models have an approximation error, the probabilities are adjusted
# if their total is below the 1e-7 tolerance threshold around 1.
for component_idx, (component_start, component_end) in enumerate(
zip(
self._index_first_param_per_component[:-1],
self._index_first_param_per_component[1:],
)
):
component_probabilities = model_output[:, :, component_start:component_end]
total_proba = component_probabilities.sum(
axis=2
) # shape (n_output, n_samples)
difference = 1 - total_proba
tolerance = 1e-7
if np.any(np.abs(difference) > tolerance):
raise_log(
ValueError(
"The class probabilities returned by the model do not sum to one"
)
)
component_probabilities += (
difference[:, :, np.newaxis] / component_probabilities.shape[2]
)
# argmax stops at the first True
# first time the random sample is greater than cumulative probability
# [0.2, 0.1, 0.5, 0.2] -> [0.2, 0.3, 0.8, 1]
# random: 0.7 -> idx = 2, this outcomes has 0.5 probability of happening
sampled_idx = np.argmax(
np.random.uniform(0, 1, size=difference.shape + (1,))
<= np.cumsum(component_probabilities, axis=2),
axis=2,
)
preds[:, :, component_idx] = np.take(
self._classes[component_idx], sampled_idx
)
return preds.transpose(1, 0, 2)
[docs]
def predict_likelihood_parameters(self, model_output: np.ndarray) -> np.ndarray:
"""
Returns the distribution parameters as a array, extracted from the raw model outputs.
"""
# reshape to (n_series * n_samples, output_chunk_length, n_likelihood_parameters)
return model_output.transpose(1, 0, 2)
def _get_median_prediction(self, model_output: np.ndarray) -> np.ndarray:
"""
Gets the class label with highest predicted probability per component extracted
from the model output.
"""
# shape (output_chunk_length, n_series * n_samples, n_likelihood_parameters)
n_output, n_samples, n_params = model_output.shape
# shape (output_chunk_length, n_series * n_samples, n_components)
n_component = len(self._classes) // self._n_outputs
preds = np.empty((n_output, n_samples, n_component), dtype=int)
for component_idx, (component_start, component_end) in enumerate(
zip(
self._index_first_param_per_component[:-1],
self._index_first_param_per_component[1:],
)
):
# shape (output_chunk_length, n_series * n_samples)
indices = np.argmax(
model_output[:, :, component_start:component_end], axis=2
)
preds[:, :, component_idx] = self._classes[component_idx][indices]
# reshape to (n_series * n_samples, output_chunk_length, n_components)
return preds.transpose(1, 0, 2)
def _get_likelihood(
likelihood: str | None,
n_outputs: int,
available_likelihoods: list[LikelihoodType],
quantiles: list[float] | None = None,
) -> SKLearnLikelihood | None:
"""Get the `Likelihood` object for `SKLearnModel`.
Parameters
----------
likelihood
The likelihood name. Must be one of `available_likelihoods` type value or `None`.
n_outputs
The number of predicted outputs per model call. `1` if `multi_models=False`, otherwise
`output_chunk_length`.
available_likelihoods
The list of available likelihood types for the model.
quantiles
Optionally, a list of quantiles. Only effective for `likelihood='quantile'`.
"""
if likelihood is None:
return None
# Convert LikelihoodType to string
available_likelihoods = [
likelihood.value if isinstance(likelihood, LikelihoodType) else likelihood
for likelihood in available_likelihoods
]
if likelihood not in available_likelihoods:
raise_log(
ValueError(
f"Invalid `likelihood='{likelihood}'`. Must be one of {available_likelihoods}"
),
logger=logger,
)
if likelihood == LikelihoodType.Gaussian.value:
return GaussianLikelihood(n_outputs=n_outputs)
elif likelihood == LikelihoodType.Poisson.value:
return PoissonLikelihood(n_outputs=n_outputs)
elif likelihood == LikelihoodType.Quantile.value:
return QuantileRegression(n_outputs=n_outputs, quantiles=quantiles)
elif likelihood == LikelihoodType.ClassProbability.value:
return ClassProbabilityLikelihood(n_outputs=n_outputs)
else:
raise_log(
ValueError("Unknown `likelihood='{likelihood}'`."),
logger=logger,
)