Source code for darts.dataprocessing.transformers.mappers

"""
Mapper and InvertibleMapper
---------------------------
"""

from collections.abc import Callable, Mapping
from typing import Any

import numpy as np
import pandas as pd

from darts import TimeSeries
from darts.dataprocessing.transformers.base_data_transformer import BaseDataTransformer
from darts.dataprocessing.transformers.invertible_data_transformer import (
    InvertibleDataTransformer,
)
from darts.logging import get_logger

logger = get_logger(__name__)

MapperFn = (
    Callable[[np.number], np.number] | Callable[[pd.Timestamp, np.number], np.number]
)


[docs] class Mapper(BaseDataTransformer): def __init__( self, fn: Callable[[np.number], np.number] | Callable[[pd.Timestamp, np.number], np.number], name: str = "Mapper", n_jobs: int = 1, verbose: bool = False, columns: str | list[str] | None = None, ): """ Data transformer to apply a custom function to a (sequence of) ``TimeSeries`` (similar to calling :func:`TimeSeries.map()` on each series). The mapper takes care of parallelizing the operations on multiple series over multiple processors. Parameters ---------- fn Either a function which takes a value and returns a value ie. `f(x) = y` Or a function which takes a value and its timestamp and returns a value ie. `f(timestamp, x) = y`. name A specific name for the transformer. n_jobs The number of jobs to run in parallel. Parallel jobs are created only when a ``Sequence[TimeSeries]`` is passed as input to a method, parallelising operations regarding different ``TimeSeries``. Defaults to `1` (sequential). Setting the parameter to `-1` means using all the available processors. Note: for a small amount of data, the parallelisation overhead could end up increasing the total required amount of time. verbose Optionally, whether to print operations progress columns Optionally, a string or list of strings specifying the names of the components (columns) to transform. If specified, only these components will be transformed, and the remaining components will be kept untouched. For more information refer to the `BaseDataTransformer` documentation. In case the transformer is applied on multiple TimeSeries, it is expected that all series have the same column order. Examples -------- >>> import numpy as np >>> from darts import TimeSeries >>> from darts.dataprocessing.transformers import Mapper >>> series = TimeSeries.from_values(np.array([1, 10, 100])) >>> transformer = Mapper(np.log10) >>> series_transformed = transformer.transform(series) >>> print(series_transformed.values()) [[0.] [1.] [2.]] """ # Define fixed params (i.e. attributes defined before calling `super().__init__`): self._fn = fn super().__init__(name=name, n_jobs=n_jobs, verbose=verbose, columns=columns)
[docs] @staticmethod def ts_transform(series: TimeSeries, params: Mapping[str, Any]) -> TimeSeries: return series.map(params["fixed"]["_fn"])
[docs] class InvertibleMapper(InvertibleDataTransformer): def __init__( self, fn: Callable[[np.number], np.number] | Callable[[pd.Timestamp, np.number], np.number], inverse_fn: Callable[[np.number], np.number] | Callable[[pd.Timestamp, np.number], np.number], name: str = "InvertibleMapper", n_jobs: int = 1, verbose: bool = False, columns: str | list[str] | None = None, ): """ Data transformer to apply a custom function and its inverse to a (sequence of) ``TimeSeries`` (similar to calling :func:`TimeSeries.map()` on each series). Parameters ---------- fn Either a function which takes a value and returns a value ie. `f(x) = y` Or a function which takes a value and its timestamp and returns a value ie. `f(timestamp, x) = y`. inverse_fn Similarly to `fn`, either a function which takes a value and returns a value ie. `f(x) = y` Or a function which takes a value and its timestamp and returns a value ie. `f(timestamp, x) = y`. `inverse_fn` should be such that ``inverse_fn(fn(x)) == x``. name A specific name for the transformer. n_jobs The number of jobs to run in parallel. Parallel jobs are created only when a `Sequence[TimeSeries]` is passed as input to a method, parallelising operations regarding different `TimeSeries`. Defaults to `1` (sequential). Setting the parameter to `-1` means using all the available processors. Note: for a small amount of data, the parallelisation overhead could end up increasing the total required amount of time. verbose Optionally, whether to print operations progress Examples -------- >>> import numpy as np >>> from darts import TimeSeries >>> from darts.dataprocessing.transformers import InvertibleMapper >>> series = TimeSeries.from_values(np.array([1, 10, 100])) >>> transformer = InvertibleMapper(np.log10, lambda x: 10 ** x) >>> series_transformed = transformer.transform(series) >>> print(series_transformed.values()) [[0.] [1.] [2.]] [3.]] >>> series_restored = transformer.inverse_transform(series_transformed) >>> print(series_restored.values()) [[ 1.] [ 10.] [100.]] """ self._fn = fn self._inverse_fn = inverse_fn super().__init__(name=name, n_jobs=n_jobs, verbose=verbose, columns=columns)
[docs] @staticmethod def ts_transform( series: TimeSeries, params: Mapping[str, Mapping[str, MapperFn]] ) -> TimeSeries: return series.map(params["fixed"]["_fn"])
[docs] @staticmethod def ts_inverse_transform( series: TimeSeries, params: Mapping[str, Mapping[str, MapperFn]], insample: TimeSeries | None = None, ) -> TimeSeries: return series.map(params["fixed"]["_inverse_fn"])