"""
Pipeline
--------
"""
from copy import deepcopy
from typing import Iterator, Sequence, Union
from darts import TimeSeries
from darts.dataprocessing.transformers import (
BaseDataTransformer,
FittableDataTransformer,
InvertibleDataTransformer,
)
from darts.logging import get_logger, raise_if_not
logger = get_logger(__name__)
[docs]class Pipeline:
def __init__(
self,
transformers: Sequence[BaseDataTransformer],
copy: bool = False,
verbose: bool = None,
n_jobs: int = None,
):
"""
Pipeline to combine multiple data transformers, chaining them together.
Parameters
----------
transformers
Sequence of data transformers.
copy
If set makes a (deep) copy of each data transformer before adding them to the pipeline
n_jobs
The number of jobs to run in parallel. Parallel jobs are created only when a ``Sequence[TimeSeries]`` is
passed as input to a method, parallelising operations regarding different ``TimeSeries``. Defaults to `1`
(sequential). Setting the parameter to `-1` means using all the available processors.
Note: for a small amount of data, the parallelisation overhead could end up increasing the total
required amount of time.
Note: this parameter will overwrite the value set in each single transformer. Leave this parameter set to
`None` for keeping the original transformers' configurations.
verbose
Whether to print progress of the operations.
Note: this parameter will overwrite the value set in each single transformer. Leave this parameter set
to `None` for keeping the transformers configurations.
Examples
--------
>>> import numpy as np
>>> from darts import TimeSeries
>>> from darts.datasets import AirPassengersDataset
>>> from darts.dataprocessing.transformers import Scaler, MissingValuesFiller
>>> from darts.dataprocessing.pipeline import Pipeline
>>> values = np.arange(start=0, stop=12.5, step=2.5)
>>> values[1:3] = np.nan
>>> series = series.from_values(values)
>>> pipeline = Pipeline([MissingValuesFiller(), Scaler()])
>>> series_transformed = pipeline.fit_transform(series)
<TimeSeries (DataArray) (time: 5, component: 1, sample: 1)>
array([[[0. ]],
[[0.25]],
[[0.5 ]],
[[0.75]],
[[1. ]]])
Coordinates:
* time (time) int64 0 1 2 3 4
* component (component) object '0'
Dimensions without coordinates: sample
"""
raise_if_not(
all((isinstance(t, BaseDataTransformer)) for t in transformers),
"transformers should be objects deriving from BaseDataTransformer",
logger,
)
if transformers is None or len(transformers) == 0:
logger.warning("Empty pipeline created")
self._transformers: Sequence[BaseDataTransformer[TimeSeries]] = []
elif copy:
self._transformers = deepcopy(transformers)
else:
self._transformers = transformers
self._invertible = all(
isinstance(t, InvertibleDataTransformer) for t in self._transformers
)
if verbose is not None:
for transformer in self._transformers:
transformer.set_verbose(verbose)
if n_jobs is not None:
for transformer in self._transformers:
transformer.set_n_jobs(n_jobs)
[docs] def fit(self, data: Union[TimeSeries, Sequence[TimeSeries]]):
"""
Fit all fittable transformers in pipeline.
Parameters
----------
data
(`Sequence` of) `TimeSeries` to fit on.
"""
# Find the last fittable transformer index
# No need to fit (and thus transform) after this index, possibly saving a fair bit of time
last_fittable_idx = -1
for idx, transformer in enumerate(self._transformers):
if isinstance(transformer, FittableDataTransformer):
last_fittable_idx = idx
for idx, transformer in enumerate(self._transformers):
if idx <= last_fittable_idx and isinstance(
transformer, FittableDataTransformer
):
transformer.fit(data)
if idx < last_fittable_idx:
data = transformer.transform(data)
[docs] def invertible(self) -> bool:
"""
Returns whether the pipeline is invertible or not.
A pipeline is invertible if all transformers in the pipeline are themselves invertible.
Returns
-------
bool
`True` if the pipeline is invertible, `False` otherwise
"""
return self._invertible
def __getitem__(self, key: Union[int, slice]) -> "Pipeline":
"""
Gets subset of Pipeline based either on index or slice with indexes.
Resulting pipeline will deep copy transformers of the original pipeline.
Parameters
----------
key
Either int or slice indicating the subset of data transformers to keep.
Returns
-------
Pipeline
Subset of pipeline determined by key.
"""
raise_if_not(
isinstance(key, int) or isinstance(key, slice),
"key must be either an int or a slice",
logger,
)
if isinstance(key, int):
transformers = [self._transformers[key]]
else:
transformers = self._transformers[key]
return Pipeline(transformers, copy=True)
def __iter__(self) -> Iterator[BaseDataTransformer]:
"""
Returns
-------
Iterator
Iterator on sequence of data transformers
"""
return self._transformers.__iter__()
def __len__(self):
return len(self._transformers)
def __copy__(self, deep: bool = True):
return Pipeline(self._transformers, copy=deep)
def __deepcopy__(self, memo=None):
return self.__copy__(deep=True)
def __str__(self):
string = "Pipeline: "
arrow = " -> "
for transformer in self._transformers:
string += str(transformer) + arrow
return string[: -len(arrow)]
def __repr__(self):
return self.__str__()