"""
Timeseries
----------
``TimeSeries`` is `Darts` container for storing and handling time series data. It supports univariate or
multivariate time series that can be deterministic or stochastic.
The values are stored in an array of shape `(times, components, samples)`, where `times` are the number of
time steps, `components` are the number of columns, and `samples` are the number of samples in the series.
**Definitions:**
- A series with `components = 1` is **univariate**, and a series with `components > 1` is **multivariate**.
- A series with `samples = 1` is **deterministic** and a series with `samples > 1` is **stochastic** (or
**probabilistic**).
Each series also stores a `time_index`, which contains either datetimes (:class:`pandas.DateTimeIndex`) or integer
indices (:class:`pandas.RangeIndex`).
Optionally, ``TimeSeries`` can store static covariates, a hierarchy, and / or metadata.
- **Static covariates** are time-invariant external data / information about the series and can be used by some models
to help improve predictions. Find more info on covariates `here
<https://unit8co.github.io/darts/userguide/covariates.html>`_.
- A **hierarchy** describes the hierarchical structure of the components which can be used to reconcile forecasts. For
more info on hierarchical reconciliation `here
<https://unit8co.github.io/darts/examples/16-hierarchical-reconciliation.html>`_.
- **Metadata** can be used to store any additional information about the series which will not be used by any model.
``TimeSeries`` **are guaranteed to:**
- Have a strictly monotonically increasing time index with a well-defined frequency (without holes / missing dates).
For more info on available ``DateTimeIndex`` frequencies, see `date offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_. For integer-indexed
series the frequency corresponds to the constant step size between consecutive indices.
- Contain numeric data types only
- Have unique component / column names
- Have static covariates consistent with their components (global or component-specific), or no static covariates
- Have a hierarchy consistent with their components, or no hierarchy
"""
import contextlib
import itertools
import pickle
import re
import sys
from collections import defaultdict
from collections.abc import Sequence
from copy import deepcopy
from inspect import signature
from io import StringIO
from types import ModuleType
from typing import Any, Callable, Literal, Optional, Union
import matplotlib.axes
import matplotlib.pyplot as plt
import narwhals as nw
import numpy as np
import pandas as pd
import xarray as xr
from narwhals.typing import IntoDataFrame, IntoSeries
from narwhals.utils import Implementation
from pandas.tseries.frequencies import to_offset
from scipy.stats import kurtosis, skew
from darts.logging import get_logger, raise_log
from darts.utils import _build_tqdm_iterator, _parallel_apply
from darts.utils.utils import (
SUPPORTED_RESAMPLE_METHODS,
expand_arr,
generate_index,
n_steps_between,
)
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
logger = get_logger(__name__)
# dimension names in the array
# the "time" one can be different, if it has a name in the underlying Series/DataFrame.
DIMS = ("time", "component", "sample")
TIME_AX = 0
COMP_AX = 1
SMPL_AX = 2
AXES = {"time": TIME_AX, "component": COMP_AX, "sample": SMPL_AX}
VALID_INDEX_TYPES = (pd.DatetimeIndex, pd.RangeIndex)
STATIC_COV_TAG = "static_covariates"
DEFAULT_GLOBAL_STATIC_COV_NAME = "global_components"
HIERARCHY_TAG = "hierarchy"
METADATA_TAG = "metadata"
[docs]class TimeSeries:
def __init__(
self,
times: Union[pd.DatetimeIndex, pd.RangeIndex, pd.Index],
values: np.ndarray,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
components: Optional[pd._typing.Axes] = None,
fillna_value: Optional[float] = None,
static_covariates: Optional[Union[pd.Series, pd.DataFrame]] = None,
hierarchy: Optional[dict] = None,
metadata: Optional[dict] = None,
copy: bool = True,
):
"""Create a ``TimeSeries`` from a time index `times` and values `values`.
See Also
--------
TimeSeries.from_dataframe : Create from a `DataFrame` (:class:`pandas.DataFrame`, :class:`polars.DataFrame`,
and other backends).
TimeSeries.from_group_dataframe : Create multiple TimeSeries by groups from a :class:`pandas.DataFrame`.
TimeSeries.from_series : Create from a `Series` (:class:`pandas.Series`, :class:`polars.Series`, and other
backends).
TimeSeries.from_values : Create from a :class:`numpy.ndarray`.
TimeSeries.from_times_and_values : Create from a time index and a :class:`numpy.ndarray`.
TimeSeries.from_csv : Create from a CSV file.
TimeSeries.from_json : Create from a JSON file.
TimeSeries.from_xarray : Create from an :class:`xarray.DataArray`.
Parameters
----------
times
A pandas DateTimeIndex, RangeIndex, or Index that can be converted to a RangeIndex representing the time
axis for the time series. It is better if the index has no holes; alternatively setting
`fill_missing_dates` can in some cases solve these issues (filling holes with NaN, or with the provided
`fillna_value` numeric value, if any).
values
A Numpy array of values for the TimeSeries. Both 2-dimensional arrays, for deterministic series,
and 3-dimensional arrays, for probabilistic series, are accepted. In the former case the dimensions
should be (time, component), and in the latter case (time, component, sample).
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
components
Optionally, some column names to use for the second `values` dimension.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
static_covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series or a pandas
DataFrame. If a Series, the index represents the static variables. The covariates are globally 'applied'
to all components of the TimeSeries. If a DataFrame, the columns represent the static variables and the
rows represent the components of the uni/multivariate TimeSeries. If a single-row DataFrame, the covariates
are globally 'applied' to all components of the TimeSeries. If a multi-row DataFrame, the number of
rows must match the number of components of the TimeSeries (in this case, the number of columns in
``values``). This adds control for component-specific static covariates.
hierarchy
Optionally, a dictionary describing the grouping(s) of the time series. The keys are component names, and
for a given component name `c`, the value is a list of component names that `c` "belongs" to. For instance,
if there is a `total` component, split both in two divisions `d1` and `d2` and in two regions `r1` and `r2`,
and four products `d1r1` (in division `d1` and region `r1`), `d2r1`, `d1r2` and `d2r2`, the hierarchy would
be encoded as follows.
.. highlight:: python
.. code-block:: python
hierarchy={
"d1r1": ["d1", "r1"],
"d1r2": ["d1", "r2"],
"d2r1": ["d2", "r1"],
"d2r2": ["d2", "r2"],
"d1": ["total"],
"d2": ["total"],
"r1": ["total"],
"r2": ["total"]
}
..
The hierarchy can be used to reconcile forecasts (so that the sums of the forecasts at
different levels are consistent), see `hierarchical reconciliation
<https://unit8co.github.io/darts/generated_api/darts.dataprocessing.transformers.reconciliation.html>`_.
metadata
Optionally, a dictionary with metadata to be added to the TimeSeries.
copy
Whether to copy the `times` and `values` objects. If `copy=False`, mutating the series data will affect the
original data. Additionally, if `times` lack a frequency or step size, it will be assigned to the original
object.
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> import numpy as np
>>> from darts import TimeSeries
>>> from darts.utils.utils import generate_index
>>> # create values and times with daily frequency
>>> vals, times = np.arange(3), generate_index("2020-01-01", length=3, freq="D")
>>> series = TimeSeries(times=times, values=vals)
>>> series.shape
(3, 1, 1)
"""
if not (
isinstance(times, VALID_INDEX_TYPES)
or np.issubdtype(times.dtype, np.integer)
):
raise_log(
ValueError(
"the `times` argument must be a `pandas.RangeIndex`, or `pandas.DateTimeIndex`. Use "
"TimeSeries.from_values() if you want to use an automatic `RangeIndex`.",
),
logger=logger,
)
# avoid copying if data is already np.ndarray:
values = np.array(values) if not isinstance(values, np.ndarray) else values
# optionally, cast values to float
if not np.issubdtype(values.dtype, np.floating):
values = values.astype(np.float64)
if not (
np.issubdtype(values.dtype, np.float64)
or np.issubdtype(values.dtype, np.float32)
):
logger.warning(
"TimeSeries is using a numeric type different from numpy.float32 or numpy.float64. "
"Not all functionalities may work properly. It is recommended casting your data to floating "
"point numbers before using TimeSeries."
)
values = expand_arr(values, ndim=len(DIMS))
if len(values.shape) != 3:
raise_log(
ValueError(
f"TimeSeries require a `values` array that has or can be expanded to "
f"3 dimensions ({DIMS})."
),
logger,
)
if not len(times) == len(values):
raise_log(
ValueError("The time index and values must have the same length."),
logger,
)
if components is None:
components = pd.Index([str(idx) for idx in range(values.shape[COMP_AX])])
elif isinstance(components, str):
components = pd.Index([components])
elif not isinstance(components, pd.Index):
components = pd.Index(components)
if len(components) != values.shape[COMP_AX]:
raise_log(
ValueError(
"The number of provided components must match the number of "
"components from `values` (`values.shape[1]`). Expected "
f"number of components: `{values.shape[1]}`, received: `{len(components)}`."
),
logger=logger,
)
if copy:
# deepcopy the index as updating `times.freq` with a shallow `copy()` mutates the original index
times = deepcopy(times)
values = values.copy()
# clean component (column) names if needed (when names are not unique, or not strings)
if len(set(components)) != len(components) or any([
not isinstance(s, str) for s in components
]):
components = _clean_components(components)
has_datetime_index = isinstance(times, pd.DatetimeIndex)
has_range_index = isinstance(times, pd.RangeIndex)
has_integer_index = not (has_datetime_index or has_range_index)
# remove timezone information if present; drops the frequency
# TODO: potential to use timezone-aware index since `TimeSeries` was refactored
# to use numpy as backend
if has_datetime_index and times.tz is not None:
logger.warning(
f"The provided DatetimeIndex was associated with a timezone (tz), which is currently "
f"not supported. To avoid unexpected behaviour, the tz information was removed. Consider "
f"calling `ts.time_index.tz_localize({times.tz})` when exporting the results."
f"To plot the series with the right time steps, consider setting the matplotlib.pyplot "
f"`rcParams['timezone']` parameter to automatically convert the time axis back to the "
f"original timezone."
)
times = times.tz_localize(None)
has_frequency = (
has_datetime_index and times.freq is not None
) or has_range_index
if not has_frequency:
# can only be `pd.DatetimeIndex` or int `pd.Index` (not `pd.RangeIndex`)
if fill_missing_dates:
# optionally fill missing dates
times, values = self._fill_missing_dates(
times=times, values=values, freq=freq
)
elif freq is not None:
# using the provided `freq`
times, values = self._restore_from_frequency(
times=times, values=values, freq=freq
)
elif has_integer_index:
# integer `pd.Index` and no `freq` is provided; try convert it to pd.RangeIndex
times, values = self._restore_range_indexed(times=times, values=values)
else:
# `pd.DatetimeIndex`, and no `freq` provided; sort and see later if frequency can be inferred
times, values = self._sort_index(times=times, values=values)
elif (
(has_datetime_index and times.freq.n < 0)
or has_range_index
and times.step < 0
):
times = times[::-1]
values = values[::-1]
if fillna_value is not None:
values[np.isnan(values)] = fillna_value
if has_datetime_index:
# frequency must be known or it can be inferred
freq = times.freq
if freq is None:
freq = to_offset(times.inferred_freq)
times.freq = freq
if freq is None:
raise_log(
ValueError(
"The time index is missing the `freq` attribute, and the frequency "
"could not be directly inferred. This probably comes from inconsistent date frequencies with "
"missing dates. If you know the actual frequency, try setting `fill_missing_dates=True, "
"freq=actual_frequency`. If not, try setting `fill_missing_dates=True, freq=None` to see if a "
"frequency can be inferred."
),
logger,
)
freq_str: Optional[str] = freq.freqstr
else:
freq: int = times.step
freq_str = None
# how the dimensions are named; we convert hashable to string
self._time_dim = str(times.name) if times.name is not None else DIMS[TIME_AX]
self._time_index = times
self._freq = freq
self._freq_str = freq_str
self._has_datetime_index = has_datetime_index
self._values = values
self._components = components
# check static covariates
if not (
isinstance(static_covariates, (pd.Series, pd.DataFrame))
or static_covariates is None
):
raise_log(
ValueError(
"`static_covariates` must be either a pandas Series, DataFrame or None"
),
logger,
)
# check if valid static covariates for multivariate TimeSeries
if isinstance(static_covariates, pd.DataFrame):
n_components = len(static_covariates)
if n_components > 1 and n_components != self.n_components:
raise_log(
ValueError(
"When passing a multi-row pandas DataFrame, the number of rows must match the number of "
"components of the TimeSeries object (multi-component/multi-row static covariates "
"must map to each TimeSeries component)."
),
logger,
)
if copy:
static_covariates = static_covariates.copy()
elif isinstance(static_covariates, pd.Series):
static_covariates = static_covariates.to_frame().T
else: # None
pass
if static_covariates is not None:
static_covariates.index = (
self.components
if len(static_covariates) == self.n_components
else [DEFAULT_GLOBAL_STATIC_COV_NAME]
)
static_covariates.columns.name = STATIC_COV_TAG
# convert numerical columns to same dtype as series
# we get all numerical columns, except those that have right dtype already
cols_to_cast = static_covariates.select_dtypes(
include=np.number, exclude=self.dtype
).columns
# Calling astype is costly even when there's no change...
if not cols_to_cast.empty:
static_covariates = static_covariates.astype(
{col: self.dtype for col in cols_to_cast}, copy=False
)
# prepare metadata
if metadata is not None and not isinstance(metadata, dict):
raise_log(
ValueError(
"`metadata` must be of type `dict` mapping metadata attributes to their values."
),
logger,
)
# handle hierarchy
self._top_level_component = None
self._bottom_level_components = None
if hierarchy is not None:
if not isinstance(hierarchy, dict):
raise_log(
ValueError(
"The hierarchy must be a dict mapping (non-top) component names to their parent(s) "
"in the hierarchy."
),
logger,
)
# pre-compute grouping information
components_set = set(self.components)
children = set(hierarchy.keys())
# convert string ancestors to list of strings
hierarchy = {
k: ([v] if isinstance(v, str) else v) for k, v in hierarchy.items()
}
if not all(c in components_set for c in children):
raise_log(
ValueError(
"The keys of the hierarchy must be time series components"
),
logger,
)
ancestors = set().union(*hierarchy.values())
if not all(a in components_set for a in ancestors):
raise_log(
ValueError(
"The values of the hierarchy must only contain component names matching those "
"of the series."
),
logger,
)
hierarchy_top = components_set - children
if not len(hierarchy_top) == 1:
raise_log(
ValueError(
"The hierarchy must be such that only one component does not appear as a key "
"(the top level component)."
),
logger,
)
self._top_level_component = hierarchy_top.pop()
if self._top_level_component not in ancestors:
raise_log(
ValueError(
"Invalid hierarchy. Component {} appears as it should be top-level, but "
"does not appear as an ancestor in the hierarchy dict."
),
logger,
)
bottom_level = components_set - ancestors
# maintain the same order as the original components
self._bottom_level_components = [
c for c in self.components if c in bottom_level
]
self._attrs = {
STATIC_COV_TAG: static_covariates,
HIERARCHY_TAG: hierarchy,
METADATA_TAG: metadata,
}
"""
Factory Methods
===============
"""
[docs] @classmethod
def from_xarray(
cls,
xa: xr.DataArray,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
fillna_value: Optional[float] = None,
copy: bool = True,
) -> Self:
"""Create a ``TimeSeries`` from an `xarray.DataArray`.
The dimensions of the DataArray have to be (time, component, sample), in this order. The time
dimension can have an arbitrary name, but component and sample must be named "component" and "sample",
respectively.
The first dimension (time), and second dimension (component) must be indexed (i.e., have coordinates).
The time must be indexed either with a pandas DatetimeIndex, a pandas RangeIndex, or a pandas Index that can
be converted to a RangeIndex. It is better if the index has no holes; alternatively setting
`fill_missing_dates` can in some cases solve these issues (filling holes with NaN, or with the provided
`fillna_value` numeric value, if any).
If two components have the same name or are not strings, this method will disambiguate the components
names by appending a suffix of the form "<name>_N" to the N-th column with name "name".
The component names in the static covariates and hierarchy (if any) are *not* disambiguated.
Parameters
----------
xa
The `xarray.DataArray`
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
copy
Whether to copy the `times` (time index dimension) and `values` (data) objects. If `copy=False`, mutating
the series data will affect the original data. Additionally, if `times` lack a frequency or step size, it
will be assigned to the original object.
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> import xarray as xr
>>> import numpy as np
>>> from darts.timeseries import DIMS
>>> from darts import TimeSeries
>>> from darts.utils.utils import generate_index
>>>
>>> # create values with the required dimensions (time, component, sample)
>>> vals = np.random.random((3, 1, 1))
>>> # create time index with daily frequency
>>> times = generate_index("2020-01-01", length=3, freq="D")
>>> columns = ["vals"]
>>>
>>> # create xarray with the required dimensions and coordinates
>>> xa = xr.DataArray(
>>> vals,
>>> dims=DIMS,
>>> coords={DIMS[0]: times, DIMS[1]: columns}
>>> )
>>> series = TimeSeries.from_xarray(xa)
>>> series.shape
(3, 1, 1)
"""
return cls(
times=xa.get_index(xa.dims[TIME_AX]),
values=xa.values,
fill_missing_dates=fill_missing_dates,
freq=freq,
components=xa.get_index(xa.dims[COMP_AX]),
fillna_value=fillna_value,
static_covariates=xa.attrs.get(STATIC_COV_TAG),
hierarchy=xa.attrs.get(HIERARCHY_TAG),
metadata=xa.attrs.get(METADATA_TAG),
copy=copy,
)
[docs] @classmethod
def from_csv(
cls,
filepath_or_buffer,
time_col: Optional[str] = None,
value_cols: Optional[Union[list[str], str]] = None,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
fillna_value: Optional[float] = None,
static_covariates: Optional[Union[pd.Series, pd.DataFrame]] = None,
hierarchy: Optional[dict] = None,
metadata: Optional[dict] = None,
**kwargs,
) -> Self:
"""Create a ``TimeSeries`` from a CSV file.
One column can be used to represent the time (if not present, the time index will be a RangeIndex)
and a list of columns `value_cols` can be used to indicate the values for this time series.
Parameters
----------
filepath_or_buffer
The path to the CSV file, or the file object; consistent with the argument of `pandas.read_csv` function
time_col
The time column name. If set, the column will be cast to a pandas DatetimeIndex (if it contains
timestamps) or a RangeIndex (if it contains integers).
If not set, the pandas RangeIndex will be used.
value_cols
A string or list of strings representing the value column(s) to be extracted from the CSV file. If set to
`None`, all columns from the CSV file will be used (except for the time_col, if specified)
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
static_covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series or a pandas
DataFrame. If a Series, the index represents the static variables. The covariates are globally 'applied'
to all components of the TimeSeries. If a DataFrame, the columns represent the static variables and the
rows represent the components of the uni/multivariate TimeSeries. If a single-row DataFrame, the covariates
are globally 'applied' to all components of the TimeSeries. If a multi-row DataFrame, the number of
rows must match the number of components of the TimeSeries (in this case, the number of columns in the CSV
file). This adds control for component-specific static covariates.
hierarchy
Optionally, a dictionary describing the grouping(s) of the time series. The keys are component names, and
for a given component name `c`, the value is a list of component names that `c` "belongs" to. For instance,
if there is a `total` component, split both in two divisions `d1` and `d2` and in two regions `r1` and `r2`,
and four products `d1r1` (in division `d1` and region `r1`), `d2r1`, `d1r2` and `d2r2`, the hierarchy would
be encoded as follows.
.. highlight:: python
.. code-block:: python
hierarchy={
"d1r1": ["d1", "r1"],
"d1r2": ["d1", "r2"],
"d2r1": ["d2", "r1"],
"d2r2": ["d2", "r2"],
"d1": ["total"],
"d2": ["total"],
"r1": ["total"],
"r2": ["total"]
}
..
The hierarchy can be used to reconcile forecasts (so that the sums of the forecasts at
different levels are consistent), see `hierarchical reconciliation
<https://unit8co.github.io/darts/generated_api/darts.dataprocessing.transformers.reconciliation.html>`_.
metadata
Optionally, a dictionary with metadata to be added to the TimeSeries.
**kwargs
Optional arguments to be passed to `pandas.read_csv` function
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> from darts import TimeSeries
>>> TimeSeries.from_csv("data.csv", time_col="time")
"""
return cls.from_dataframe(
df=pd.read_csv(filepath_or_buffer=filepath_or_buffer, **kwargs),
time_col=time_col,
value_cols=value_cols,
fill_missing_dates=fill_missing_dates,
freq=freq,
fillna_value=fillna_value,
static_covariates=static_covariates,
hierarchy=hierarchy,
metadata=metadata,
copy=False,
)
[docs] @classmethod
def from_dataframe(
cls,
df: IntoDataFrame,
time_col: Optional[str] = None,
value_cols: Optional[Union[list[str], str]] = None,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
fillna_value: Optional[float] = None,
static_covariates: Optional[Union[pd.Series, pd.DataFrame]] = None,
hierarchy: Optional[dict] = None,
metadata: Optional[dict] = None,
copy: bool = True,
) -> Self:
"""Create a ``TimeSeries`` from a selection of columns of a `DataFrame`.
One column (or the DataFrame index) has to represent the time, and a list of columns `value_cols` has to
represent the values for this time series.
Parameters
----------
df
The DataFrame, or anything which can be converted to a narwhals DataFrame (e.g. pandas.DataFrame,
polars.DataFrame, ...). See the `narwhals documentation
<https://narwhals-dev.github.io/narwhals/api-reference/narwhals/#narwhals.from_native>`_ for more
information.
time_col
The time column name. If set, the column will be cast to a pandas DatetimeIndex (if it contains
timestamps) or a RangeIndex (if it contains integers).
If not set, the DataFrame index will be used. In this case the DataFrame must contain an index that is
either a pandas DatetimeIndex, a pandas RangeIndex, or a pandas Index that can be converted to a
RangeIndex. It is better if the index has no holes; alternatively setting `fill_missing_dates` can in some
cases solve these issues (filling holes with NaN, or with the provided `fillna_value` numeric value, if
any).
value_cols
A string or list of strings representing the value column(s) to be extracted from the DataFrame. If set to
`None`, the whole DataFrame will be used.
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
static_covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series or a pandas
DataFrame. If a Series, the index represents the static variables. The covariates are globally 'applied'
to all components of the TimeSeries. If a DataFrame, the columns represent the static variables and the
rows represent the components of the uni/multivariate TimeSeries. If a single-row DataFrame, the covariates
are globally 'applied' to all components of the TimeSeries. If a multi-row DataFrame, the number of
rows must match the number of components of the TimeSeries (in this case, the number of columns in
``value_cols``). This adds control for component-specific static covariates.
hierarchy
Optionally, a dictionary describing the grouping(s) of the time series. The keys are component names, and
for a given component name `c`, the value is a list of component names that `c` "belongs" to. For instance,
if there is a `total` component, split both in two divisions `d1` and `d2` and in two regions `r1` and `r2`,
and four products `d1r1` (in division `d1` and region `r1`), `d2r1`, `d1r2` and `d2r2`, the hierarchy would
be encoded as follows.
.. highlight:: python
.. code-block:: python
hierarchy={
"d1r1": ["d1", "r1"],
"d1r2": ["d1", "r2"],
"d2r1": ["d2", "r1"],
"d2r2": ["d2", "r2"],
"d1": ["total"],
"d2": ["total"],
"r1": ["total"],
"r2": ["total"]
}
..
The hierarchy can be used to reconcile forecasts (so that the sums of the forecasts at
different levels are consistent), see `hierarchical reconciliation
<https://unit8co.github.io/darts/generated_api/darts.dataprocessing.transformers.reconciliation.html>`_.
metadata
Optionally, a dictionary with metadata to be added to the TimeSeries.
copy
Whether to copy the `times` (DataFrame index or the `time_col` column) and DataFrame `values`.
If `copy=False`, mutating the series data will affect the original data. Additionally, if `times` lack a
frequency or step size, it will be assigned to the original object.
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> import pandas as pd
>>> from darts import TimeSeries
>>> from darts.utils.utils import generate_index
>>> # create values and times with daily frequency
>>> data = {"vals": range(3), "time": generate_index("2020-01-01", length=3, freq="D")}
>>> # create from `pandas.DataFrame`
>>> df = pd.DataFrame(data)
>>> series = TimeSeries.from_dataframe(df, time_col="time")
>>> # shape (n time steps, n components, n samples)
>>> series.shape
(3, 1, 1)
>>> # or from `polars.DataFrame` (make sure Polars is installed)
>>> import polars as pl
>>> df = pl.DataFrame(data)
>>> series = TimeSeries.from_dataframe(df, time_col="time")
>>> series.shape
(3, 1, 1)
"""
df = nw.from_native(df, eager_only=True, pass_through=False)
# get values
if value_cols is None:
series_df = df.drop(time_col) if time_col else df
else:
if isinstance(value_cols, (str, int)):
value_cols = [value_cols]
series_df = df[value_cols]
# get time index
if time_col:
if time_col not in df.columns:
raise_log(AttributeError(f"time_col='{time_col}' is not present."))
time_col_vals = df.get_column(time_col)
if time_col_vals.dtype == nw.String:
# Try to convert to integers if needed
with contextlib.suppress(Exception):
time_col_vals = time_col_vals.cast(nw.Int64)
if time_col_vals.dtype.is_integer():
if time_col_vals.is_duplicated().any():
raise_log(
ValueError(
"The provided integer time index column contains duplicate values."
)
)
# Temporarily use an integer `pd.Index` to sort the values; later replaced with
# a `pd.RangeIndex` in `__init__()`
time_index = pd.Index(time_col_vals)
elif isinstance(time_col_vals.dtype, nw.String):
# The integer conversion failed; try datetimes
try:
time_index = pd.DatetimeIndex(time_col_vals)
except ValueError:
raise_log(
AttributeError(
"'time_col' is of 'String' dtype but doesn't contain valid timestamps"
)
)
elif isinstance(time_col_vals.dtype, nw.Datetime):
# force time index to be timezone naive, as polars converts to UTC
time_zone = time_col_vals.dtype.time_zone
if time_zone is not None:
logger.warning(
"The provided DatetimeIndex was associated with a timezone (tz), which is currently not "
"supported. To avoid unexpected behaviour, the tz information was removed. Consider calling "
f"`ts.time_index.tz_localize({time_zone})` when exporting the results."
"To plot the series with the right time steps, consider setting the matplotlib.pyplot "
"`rcParams['timezone']` parameter to automatically convert the time axis back to the "
"original timezone."
)
time_col_vals = time_col_vals.dt.replace_time_zone(None)
time_index = pd.DatetimeIndex(time_col_vals)
else:
raise_log(
AttributeError(
"Invalid type of `time_col`: it needs to be of either 'String', 'Datetime' or 'Int' dtype."
)
)
if not time_index.name:
time_index.name = time_col
else:
time_index = nw.maybe_get_index(df)
if time_index is None:
time_index = pd.RangeIndex(len(df), name=DIMS[TIME_AX])
logger.warning(
"No time column specified (`time_col=None`) and no index found in the `DataFrame`. Defaulting to "
"`pandas.RangeIndex(len(df))`. If this is not desired consider adding a time column "
"to your `DataFrame` and defining `time_col`."
)
# if we are here, the dataframe was pandas
elif not (
isinstance(time_index, VALID_INDEX_TYPES)
or np.issubdtype(time_index.dtype, np.integer)
):
raise_log(
ValueError(
"If time_col is not specified, the DataFrame must be indexed either with "
"a DatetimeIndex, a RangeIndex, or an integer Index that can be converted into a RangeIndex"
),
logger,
)
return cls(
times=time_index,
values=series_df.to_numpy(),
fill_missing_dates=fill_missing_dates,
freq=freq,
components=series_df.columns,
fillna_value=fillna_value,
static_covariates=static_covariates,
hierarchy=hierarchy,
metadata=metadata,
copy=copy,
)
[docs] @classmethod
def from_group_dataframe(
cls,
df: pd.DataFrame,
group_cols: Union[list[str], str],
time_col: Optional[str] = None,
value_cols: Optional[Union[list[str], str]] = None,
static_cols: Optional[Union[list[str], str]] = None,
metadata_cols: Optional[Union[list[str], str]] = None,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
fillna_value: Optional[float] = None,
drop_group_cols: Optional[Union[list[str], str]] = None,
n_jobs: Optional[int] = 1,
verbose: Optional[bool] = False,
copy: bool = True,
) -> list[Self]:
"""Create a list of ``TimeSeries`` grouped by a selection of columns from a `DataFrame`.
One column (or the DataFrame index) has to represent the time, a list of columns `group_cols` must be used for
extracting the individual TimeSeries by groups, and a list of columns `value_cols` has to represent the values
for the individual time series. Values from columns ``group_cols`` and ``static_cols`` are added as static
covariates to the resulting TimeSeries objects. These can be viewed with `my_series.static_covariates`.
Different to `group_cols`, `static_cols` only adds the static values but are not used to extract the TimeSeries
groups.
Parameters
----------
df
The DataFrame
group_cols
A string or list of strings representing the columns from the DataFrame by which to extract the
individual TimeSeries groups.
time_col
The time column name. If set, the column will be cast to a pandas DatetimeIndex (if it contains
timestamps) or a RangeIndex (if it contains integers).
If not set, the DataFrame index will be used. In this case the DataFrame must contain an index that is
either a pandas DatetimeIndex, a pandas RangeIndex, or a pandas Index that can be converted to a
RangeIndex. Be aware that the index must represents the actual index of each individual time series group
(can contain non-unique values). It is better if the index has no holes; alternatively setting
`fill_missing_dates` can in some cases solve these issues (filling holes with NaN, or with the provided
`fillna_value` numeric value, if any).
value_cols
A string or list of strings representing the value column(s) to be extracted from the DataFrame. If set to
`None`, the whole DataFrame will be used.
static_cols
A string or list of strings representing static variable columns from the DataFrame that should be
appended as static covariates to the resulting TimeSeries groups. Different to `group_cols`, the
DataFrame is not grouped by these columns. Uses the first encountered value per group and column
(assumes that there is only one unique value). Static covariates can be used as input features to all
Darts models that support it.
metadata_cols
Same as `static_cols` but appended as metadata to the resulting TimeSeries groups. Metadata will never be
used by the underlying Darts models.
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
drop_group_cols
Optionally, a string or list of strings with `group_cols` column(s) to exclude from the static covariates.
n_jobs
Optionally, an integer representing the number of parallel jobs to run. Behavior is the same as in the
`joblib.Parallel` class.
verbose
Optionally, a boolean value indicating whether to display a progress bar.
copy
Whether to copy the `times` (DataFrame index or the `time_col` column) and DataFrame `values`.
If `copy=False`, mutating the series data will affect the original data. Additionally, if `times` lack a
frequency or step size, it will be assigned to the original object.
Returns
-------
List[TimeSeries]
A list of series, where each series represents one group from the DataFrame.
Examples
--------
>>> import pandas as pd
>>> from darts import TimeSeries
>>> from darts.utils.utils import generate_index
>>>
>>> # create a DataFrame with two series that have different ids,
>>> # values, and frequencies
>>> df_1 = pd.DataFrame({
>>> "ID": [0] * 3,
>>> "vals": range(3),
>>> "time": generate_index("2020-01-01", length=3, freq="D")}
>>> )
>>> df_2 = pd.DataFrame({
>>> "ID": [1] * 6,
>>> "vals": range(6),
>>> "time": generate_index("2020-01-01", length=6, freq="h")}
>>> )
>>> df = pd.concat([df_1, df_2], axis=0)
>>>
>>> # extract the series by "ID" groups from the DataFrame
>>> series_multi = TimeSeries.from_group_dataframe(
>>> df,
>>> group_cols="ID",
>>> time_col="time"
>>> )
>>> len(series_multi), series_multi[0].shape, series_multi[1].shape
(2, (3, 1, 1), (6, 1, 1))
"""
if time_col is None and df.index.is_monotonic_increasing:
logger.warning(
"UserWarning: `time_col` was not set and `df` has a monotonically increasing (time) index. This "
"results in time series groups with non-overlapping (time) index. You can ignore this warning if the "
"index represents the actual index of each individual time series group."
)
# group cols: used to extract time series groups from `df`, will also be added as static covariates
# (except `drop_group_cols`)
group_cols = [group_cols] if not isinstance(group_cols, list) else group_cols
if drop_group_cols:
drop_group_cols = (
[drop_group_cols]
if not isinstance(drop_group_cols, list)
else drop_group_cols
)
invalid_cols = set(drop_group_cols) - set(group_cols)
if invalid_cols:
raise_log(
ValueError(
f"Found invalid `drop_group_cols` columns. All columns must be in the passed `group_cols`. "
f"Expected any of: {group_cols}, received: {invalid_cols}."
),
logger=logger,
)
drop_group_col_idx = [
idx for idx, col in enumerate(group_cols) if col in drop_group_cols
]
else:
drop_group_cols = []
drop_group_col_idx = []
# static covariates: all `group_cols` (except `drop_group_cols`) and `static_cols`
if static_cols is not None:
static_cols = (
[static_cols] if not isinstance(static_cols, list) else static_cols
)
else:
static_cols = []
static_cov_cols = group_cols + static_cols
# columns that are used as static covariates but not for grouping
extract_static_cov_cols = [
col for col in static_cov_cols if col not in drop_group_cols
]
# metadata: all `metadata_cols`
if metadata_cols is not None:
metadata_cols = (
[metadata_cols]
if not isinstance(metadata_cols, list)
else metadata_cols
)
else:
metadata_cols = []
# columns that are used as metadata but not for grouping or static covariates
extract_metadata_cols = [
col for col in metadata_cols if col not in static_cov_cols
]
extract_time_col = [] if time_col is None else [time_col]
if value_cols is None:
value_cols = df.columns.drop(
static_cov_cols + extract_metadata_cols + extract_time_col
).tolist()
extract_value_cols = [value_cols] if isinstance(value_cols, str) else value_cols
df = df[
static_cov_cols
+ extract_value_cols
+ extract_time_col
+ extract_metadata_cols
]
if time_col:
if np.issubdtype(df[time_col].dtype, object) or np.issubdtype(
df[time_col].dtype, np.datetime64
):
df.index = pd.DatetimeIndex(df[time_col])
df = df.drop(columns=time_col)
else:
df = df.set_index(time_col)
if df.index.is_monotonic_increasing:
logger.warning(
"UserWarning: The (time) index from `df` is monotonically increasing. This "
"results in time series groups with non-overlapping (time) index. You can ignore this warning if the "
"index represents the actual index of each individual time series group."
)
# sort on entire `df` to avoid having to sort individually later on
else:
df = df.sort_index()
groups = df.groupby(group_cols[0] if len(group_cols) == 1 else group_cols)
# build progress bar for iterator
iterator = _build_tqdm_iterator(
groups,
verbose=verbose,
total=len(groups),
desc="Creating TimeSeries",
)
def from_group(static_cov_vals, group):
split = group[extract_value_cols]
static_cov_vals = (
(static_cov_vals,)
if not isinstance(static_cov_vals, tuple)
else static_cov_vals
)
# optionally, exclude group columns from static covariates
if drop_group_col_idx:
if len(drop_group_col_idx) == len(group_cols):
static_cov_vals = tuple()
else:
static_cov_vals = tuple(
val
for idx, val in enumerate(static_cov_vals)
if idx not in drop_group_col_idx
)
if static_cols:
# use first value as static covariate (assume only one unique per group)
static_cov_vals += tuple(group[static_cols].values[0])
metadata = None
if metadata_cols:
# use first value as metadata (assume only one unique per group)
metadata = {
col: val
for col, val in zip(metadata_cols, group[metadata_cols].values[0])
}
return cls.from_dataframe(
df=split,
fill_missing_dates=fill_missing_dates,
freq=freq,
fillna_value=fillna_value,
static_covariates=(
pd.DataFrame([static_cov_vals], columns=extract_static_cov_cols)
if extract_static_cov_cols
else None
),
metadata=metadata,
copy=copy,
)
return _parallel_apply(
iterator,
from_group,
n_jobs,
fn_args=dict(),
fn_kwargs=dict(),
)
[docs] @classmethod
def from_series(
cls,
pd_series: IntoSeries,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
fillna_value: Optional[float] = None,
static_covariates: Optional[Union[pd.Series, pd.DataFrame]] = None,
metadata: Optional[dict] = None,
copy: bool = True,
) -> Self:
"""Create a ``TimeSeries`` from a `Series`.
The series must contain an index that is either a pandas DatetimeIndex, a pandas RangeIndex, or a pandas Index
that can be converted into a RangeIndex. It is better if the index has no holes; alternatively setting
`fill_missing_dates` can in some cases solve these issues (filling holes with NaN, or with the provided
`fillna_value` numeric value, if any).
Parameters
----------
pd_series
The Series, or anything which can be converted to a narwhals Series (e.g. pandas.Series, ...). See the
`narwhals documentation
<https://narwhals-dev.github.io/narwhals/api-reference/narwhals/#narwhals.from_native>`_ for more
information.
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
static_covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series or a
single-row pandas DataFrame. If a Series, the index represents the static variables. If a DataFrame, the
columns represent the static variables and the single row represents the univariate TimeSeries component.
metadata
Optionally, a dictionary with metadata to be added to the TimeSeries.
copy
Whether to copy the Series' `values`. If `copy=False`, mutating the series data will affect the original
data.
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> import pandas as pd
>>> from darts import TimeSeries
>>> from darts.utils.utils import generate_index
>>> # create values and times with daily frequency
>>> vals, times = range(3), generate_index("2020-01-01", length=3, freq="D")
>>>
>>> # create from `pandas.Series`
>>> pd_series = pd.Series(vals, index=times)
>>> series = TimeSeries.from_series(pd_series)
>>> series.shape
(3, 1, 1)
"""
nw_series = nw.from_native(pd_series, series_only=True, pass_through=False)
df = nw_series.to_frame()
return cls.from_dataframe(
df,
time_col=None,
value_cols=None,
fill_missing_dates=fill_missing_dates,
freq=freq,
fillna_value=fillna_value,
static_covariates=static_covariates,
metadata=metadata,
copy=copy,
)
[docs] @classmethod
def from_times_and_values(
cls,
times: Union[pd.DatetimeIndex, pd.RangeIndex, pd.Index],
values: np.ndarray,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
columns: Optional[pd._typing.Axes] = None,
fillna_value: Optional[float] = None,
static_covariates: Optional[Union[pd.Series, pd.DataFrame]] = None,
hierarchy: Optional[dict] = None,
metadata: Optional[dict] = None,
copy: bool = True,
) -> Self:
"""Create a ``TimeSeries`` from a time index and value array.
Parameters
----------
times
A pandas DateTimeIndex, RangeIndex, or Index that can be converted to a RangeIndex representing the time
axis for the time series. It is better if the index has no holes; alternatively setting
`fill_missing_dates` can in some cases solve these issues (filling holes with NaN, or with the provided
`fillna_value` numeric value, if any).
values
A Numpy array, or array-like of values for the TimeSeries. Both 2-dimensional arrays, for deterministic
series, and 3-dimensional arrays, for probabilistic series, are accepted. In the former case the dimensions
should be (time, component), and in the latter case (time, component, sample).
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
columns
Optionally, some column names to use for the second `values` dimension.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
static_covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series or a pandas
DataFrame. If a Series, the index represents the static variables. The covariates are globally 'applied'
to all components of the TimeSeries. If a DataFrame, the columns represent the static variables and the
rows represent the components of the uni/multivariate TimeSeries. If a single-row DataFrame, the covariates
are globally 'applied' to all components of the TimeSeries. If a multi-row DataFrame, the number of
rows must match the number of components of the TimeSeries (in this case, the number of columns in
``values``). This adds control for component-specific static covariates.
hierarchy
Optionally, a dictionary describing the grouping(s) of the time series. The keys are component names, and
for a given component name `c`, the value is a list of component names that `c` "belongs" to. For instance,
if there is a `total` component, split both in two divisions `d1` and `d2` and in two regions `r1` and `r2`,
and four products `d1r1` (in division `d1` and region `r1`), `d2r1`, `d1r2` and `d2r2`, the hierarchy would
be encoded as follows.
.. highlight:: python
.. code-block:: python
hierarchy={
"d1r1": ["d1", "r1"],
"d1r2": ["d1", "r2"],
"d2r1": ["d2", "r1"],
"d2r2": ["d2", "r2"],
"d1": ["total"],
"d2": ["total"],
"r1": ["total"],
"r2": ["total"]
}
..
The hierarchy can be used to reconcile forecasts (so that the sums of the forecasts at
different levels are consistent), see `hierarchical reconciliation
<https://unit8co.github.io/darts/generated_api/darts.dataprocessing.transformers.reconciliation.html>`_.
metadata
Optionally, a dictionary with metadata to be added to the TimeSeries.
copy
Whether to copy the `times` and `values` objects. If `copy=False`, mutating the series data will affect the
original data. Additionally, if `times` lack a frequency or step size, it will be assigned to the original
object.
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> import numpy as np
>>> from darts import TimeSeries
>>> from darts.utils.utils import generate_index
>>> # create values and times with daily frequency
>>> vals, times = np.arange(3), generate_index("2020-01-01", length=3, freq="D")
>>> series = TimeSeries.from_times_and_values(times=times, values=vals)
>>> series.shape
(3, 1, 1)
"""
return cls(
times=times,
values=values,
fill_missing_dates=fill_missing_dates,
freq=freq,
components=columns,
fillna_value=fillna_value,
static_covariates=static_covariates,
hierarchy=hierarchy,
metadata=metadata,
copy=copy,
)
[docs] @classmethod
def from_values(
cls,
values: np.ndarray,
columns: Optional[pd._typing.Axes] = None,
fillna_value: Optional[float] = None,
static_covariates: Optional[Union[pd.Series, pd.DataFrame]] = None,
hierarchy: Optional[dict] = None,
metadata: Optional[dict] = None,
copy: bool = True,
) -> Self:
"""Create an ``TimeSeries`` from an array of values.
The series will have an integer time index (RangeIndex).
Parameters
----------
values
A Numpy array of values for the TimeSeries. Both 2-dimensional arrays, for deterministic series,
and 3-dimensional arrays, for probabilistic series, are accepted. In the former case the dimensions
should be (time, component), and in the latter case (time, component, sample).
columns
Columns to be used by the underlying pandas DataFrame.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
static_covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series or a pandas
DataFrame. If a Series, the index represents the static variables. The covariates are globally 'applied'
to all components of the TimeSeries. If a DataFrame, the columns represent the static variables and the
rows represent the components of the uni/multivariate TimeSeries. If a single-row DataFrame, the covariates
are globally 'applied' to all components of the TimeSeries. If a multi-row DataFrame, the number of
rows must match the number of components of the TimeSeries (in this case, the number of columns in
``values``). This adds control for component-specific static covariates.
hierarchy
Optionally, a dictionary describing the grouping(s) of the time series. The keys are component names, and
for a given component name `c`, the value is a list of component names that `c` "belongs" to. For instance,
if there is a `total` component, split both in two divisions `d1` and `d2` and in two regions `r1` and `r2`,
and four products `d1r1` (in division `d1` and region `r1`), `d2r1`, `d1r2` and `d2r2`, the hierarchy would
be encoded as follows.
.. highlight:: python
.. code-block:: python
hierarchy={
"d1r1": ["d1", "r1"],
"d1r2": ["d1", "r2"],
"d2r1": ["d2", "r1"],
"d2r2": ["d2", "r2"],
"d1": ["total"],
"d2": ["total"],
"r1": ["total"],
"r2": ["total"]
}
..
The hierarchy can be used to reconcile forecasts (so that the sums of the forecasts at
different levels are consistent), see `hierarchical reconciliation
<https://unit8co.github.io/darts/generated_api/darts.dataprocessing.transformers.reconciliation.html>`_.
metadata
Optionally, a dictionary with metadata to be added to the TimeSeries.
copy
Whether to copy the `values`. If `copy=False`, mutating the series data will affect the original data.
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> import numpy as np
>>> from darts import TimeSeries
>>> from darts.utils.utils import generate_index
>>> vals = np.arange(3)
>>> series = TimeSeries.from_times_and_values(times=times, values=vals)
>>> series.shape
(3, 1, 1)
"""
return cls(
times=pd.RangeIndex(start=0, stop=len(values), step=1),
values=values,
fill_missing_dates=False,
freq=None,
components=columns,
fillna_value=fillna_value,
static_covariates=static_covariates,
hierarchy=hierarchy,
metadata=metadata,
copy=copy,
)
[docs] @classmethod
def from_json(
cls,
json_str: str,
static_covariates: Optional[Union[pd.Series, pd.DataFrame]] = None,
hierarchy: Optional[dict] = None,
metadata: Optional[dict] = None,
) -> Self:
"""Create a ``TimeSeries`` from the JSON String representation of a ``TimeSeries``.
The JSON String representation can be generated with :func:`TimeSeries.to_json()`.
At the moment this only supports deterministic time series (i.e., made of 1 sample).
Parameters
----------
json_str
The JSON String to convert.
static_covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series or a pandas
DataFrame. If a Series, the index represents the static variables. The covariates are globally 'applied'
to all components of the TimeSeries. If a DataFrame, the columns represent the static variables and the
rows represent the components of the uni/multivariate TimeSeries. If a single-row DataFrame, the covariates
are globally 'applied' to all components of the TimeSeries. If a multi-row DataFrame, the number of
rows must match the number of components of the TimeSeries (in this case, the number of columns in
``value_cols``). This adds control for component-specific static covariates.
hierarchy
Optionally, a dictionary describing the grouping(s) of the time series. The keys are component names, and
for a given component name `c`, the value is a list of component names that `c` "belongs" to. For instance,
if there is a `total` component, split both in two divisions `d1` and `d2` and in two regions `r1` and `r2`,
and four products `d1r1` (in division `d1` and region `r1`), `d2r1`, `d1r2` and `d2r2`, the hierarchy would
be encoded as follows.
.. highlight:: python
.. code-block:: python
hierarchy={
"d1r1": ["d1", "r1"],
"d1r2": ["d1", "r2"],
"d2r1": ["d2", "r1"],
"d2r2": ["d2", "r2"],
"d1": ["total"],
"d2": ["total"],
"r1": ["total"],
"r2": ["total"]
}
..
The hierarchy can be used to reconcile forecasts (so that the sums of the forecasts at
different levels are consistent), see `hierarchical reconciliation
<https://unit8co.github.io/darts/generated_api/darts.dataprocessing.transformers.reconciliation.html>`_.
metadata
Optionally, a dictionary with metadata to be added to the TimeSeries.
Returns
-------
TimeSeries
The resulting series.
Examples
--------
>>> from darts import TimeSeries
>>> json_str = (
>>> '{"columns":["vals"],"index":["2020-01-01","2020-01-02","2020-01-03"],"data":[[0.0],[1.0],[2.0]]}'
>>> )
>>> series = TimeSeries.from_json("data.csv")
>>> series.shape
(3, 1, 1)
"""
return cls.from_dataframe(
df=pd.read_json(StringIO(json_str), orient="split"),
static_covariates=static_covariates,
hierarchy=hierarchy,
metadata=metadata,
copy=False, # JSON is immutable, so no need to copy
)
[docs] @classmethod
def from_pickle(cls, path: str) -> Self:
"""Read a pickled ``TimeSeries``.
Parameters
----------
path : string
path pointing to a pickle file that will be loaded.
Returns
-------
TimeSeries
The resulting series.
"""
with open(path, "rb") as fh:
return pickle.load(fh)
"""
Properties
==========
"""
@property
def static_covariates(self) -> Optional[pd.DataFrame]:
"""The static covariates of this series.
If defined, the static covariates are given as a `pandas.DataFrame`. The columns represent the static variables
and the rows represent the components of the series.
"""
return self._attrs.get(STATIC_COV_TAG, None)
@property
def hierarchy(self) -> Optional[dict]:
"""The hierarchy of this series.
If defined, the hierarchy is given as a dictionary. The keys are the individual components and values are the
set of parent(s) of these components in the hierarchy.
"""
return self._attrs.get(HIERARCHY_TAG, None)
@property
def metadata(self) -> Optional[dict]:
"""The metadata of this series.
If defined, the metadata is given as a dictionary.
"""
return self._attrs.get(METADATA_TAG, None)
@property
def top_level_component(self) -> Optional[str]:
"""The top level component name of this series, or `None` if the series has no hierarchy."""
return self._top_level_component
@property
def bottom_level_components(self) -> Optional[list[str]]:
"""The bottom level component names of this series, or `None` if the series has no hierarchy."""
return self._bottom_level_components
@property
def top_level_series(self) -> Optional[Self]:
"""The univariate series containing the single top-level component of this series, or `None` if the series has
no hierarchy.
"""
return self[self.top_level_component] if self.has_hierarchy else None
@property
def bottom_level_series(self) -> Optional[list[Self]]:
"""The series containing the bottom-level components of this series in the same order as they appear in the
series, or `None` if the series has no hierarchy.
"""
return (
self[[c for c in self.components if c in self.bottom_level_components]]
if self.has_hierarchy
else None
)
@property
def shape(self) -> tuple[int, int, int]:
"""The shape of the series `(n_timesteps, n_components, n_samples)`."""
return self._values.shape
@property
def n_timesteps(self) -> int:
"""The number of time steps in the series."""
return self.shape[TIME_AX]
@property
def n_samples(self) -> int:
"""The number of samples contained in the series."""
return self.shape[SMPL_AX]
@property
def n_components(self) -> int:
"""The number of components (columns) in the series."""
return self.shape[COMP_AX]
@property
def width(self) -> int:
"""The width (number of components) of the series."""
return self.n_components
@property
def is_deterministic(self) -> bool:
"""Whether the series is deterministic."""
return self.shape[SMPL_AX] == 1
@property
def is_stochastic(self) -> bool:
"""Whether the series is stochastic (probabilistic)."""
return not self.is_deterministic
@property
def is_probabilistic(self) -> bool:
"""Whether the series is stochastic (probabilistic)."""
return self.is_stochastic
@property
def is_univariate(self) -> bool:
"""Whether the series is univariate."""
return self.shape[COMP_AX] == 1
@property
def freq(self) -> Union[pd.DateOffset, int]:
"""The frequency of the series.
A ``pandas.DateOffset`` if the series is indexed with a ``pandas.DatetimeIndex``.
An integer (step size) if the series is indexed with a ``pandas.RangeIndex``.
"""
return self._freq
@property
def freq_str(self) -> str:
"""The string representation of the series' frequency."""
return self._freq_str
@property
def dtype(self):
"""The dtype of the series' values."""
return self._values.dtype
@property
def components(self) -> pd.Index:
"""The component (column) names of the series, as a ``pandas.Index``."""
return self._components
@property
def columns(self) -> pd.Index:
"""The component (column) names of the series, as a ``pandas.Index``."""
return self.components
@property
def time_index(self) -> Union[pd.DatetimeIndex, pd.RangeIndex]:
"""The time index of the series."""
return self._time_index.copy()
@property
def time_dim(self) -> str:
"""The time dimension name of the series."""
return self._time_dim
@property
def has_datetime_index(self) -> bool:
"""Whether the series is indexed with a ``pandas.DatetimeIndex`` (otherwise it is indexed with an
``pandas.RangeIndex``)."""
return self._has_datetime_index
@property
def has_range_index(self) -> bool:
"""Whether the series is indexed with an ``pandas.RangeIndex`` (otherwise it is indexed with a
``pandas.DatetimeIndex``).
"""
return not self._has_datetime_index
@property
def has_hierarchy(self) -> bool:
"""Whether the series contains a hierarchy."""
return self.hierarchy is not None
@property
def has_static_covariates(self) -> bool:
"""Whether the series contains static covariates."""
return self.static_covariates is not None
@property
def has_metadata(self) -> bool:
"""Whether the series contains metadata."""
return self.metadata is not None
@property
def duration(self) -> Union[pd.Timedelta, int]:
"""The duration of the series (as a ``pandas.Timedelta`` or `int`)."""
return self._time_index[-1] - self._time_index[0]
"""
Export functions
================
"""
[docs] def data_array(self, copy: bool = True) -> xr.DataArray:
"""Return an ``xarray.DataArray`` representation of the series.
Parameters
----------
copy
Whether to return a copy of the series. Leave it to True unless you know what you are doing.
Returns
-------
xarray.DataArray
An ``xarray.DataArray`` representation of represents the time series.
"""
xa = xr.DataArray(
self._values,
dims=(self._time_dim,) + DIMS[-2:],
coords={self._time_dim: self._time_index, DIMS[COMP_AX]: self.components},
attrs=self._attrs,
)
return xa.copy() if copy else xa
[docs] def to_series(
self,
copy: bool = True,
backend: Union[ModuleType, Implementation, str] = Implementation.PANDAS,
):
"""Return a `Series` representation of the series in a given `backend`.
Works only for univariate series that are deterministic (i.e., made of 1 sample).
Parameters
----------
copy
Whether to return a copy of the series. Leave it to True unless you know what you are doing.
backend
The backend to which to export the `TimeSeries`. See the `narwhals documentation
<https://narwhals-dev.github.io/narwhals/api-reference/narwhals/#narwhals.from_dict>`_ for all supported
backends.
Returns
-------
A Series representation of the series in a given `backend`.
"""
self._assert_univariate()
self._assert_deterministic()
backend = Implementation.from_backend(backend)
if not backend.is_pandas():
return self.to_dataframe(copy=copy, backend=backend, time_as_index=False)
data = self._values[:, 0, 0]
index = self._time_index
name = self.components[0]
if copy:
data = data.copy()
index = index.copy()
return pd.Series(data=data, index=index, name=name)
[docs] def to_dataframe(
self,
copy: bool = True,
backend: Union[ModuleType, Implementation, str] = Implementation.PANDAS,
time_as_index: bool = True,
suppress_warnings: bool = False,
):
"""Return a DataFrame representation of the series in a given `backend`.
Each of the series components will appear as a column in the DataFrame.
If the series is stochastic, the samples are returned as columns of the dataframe with column names
as 'component_s#' (e.g. with two components and two samples:
'comp0_s0', 'comp0_s1' 'comp1_s0' 'comp1_s1').
Parameters
----------
copy
Whether to return a copy of the dataframe. Leave it to True unless you know what you are doing.
backend
The backend to which to export the `TimeSeries`. See the `narwhals documentation
<https://narwhals-dev.github.io/narwhals/api-reference/narwhals/#narwhals.from_dict>`_ for all supported
backends.
time_as_index
Whether to set the time index as the index of the dataframe or in the left-most column.
Only effective with the pandas `backend`.
suppress_warnings
Whether to suppress the warnings for the `DataFrame` creation.
Returns
-------
DataFrame
A DataFrame representation of the series in a given `backend`.
"""
backend = Implementation.from_backend(backend)
if time_as_index and not backend.is_pandas():
if not suppress_warnings:
logger.warning(
'`time_as_index=True` is only supported with `backend="pandas"`, and will be ignored.'
)
time_as_index = False
values = self._values
if not self.is_deterministic:
if not suppress_warnings:
logger.warning(
"You are transforming a stochastic TimeSeries (i.e., contains several samples). "
"The resulting DataFrame is a 2D object with all samples on the columns. "
"If this is not the expected behavior consider calling a function "
"adapted to stochastic TimeSeries like quantile_df()."
)
comp_name = list(self.components)
samples = range(self.n_samples)
columns = [
"_s".join((comp_name, str(sample_id)))
for comp_name, sample_id in itertools.product(comp_name, samples)
]
data = values.reshape(values.shape[0], len(columns))
else:
columns = self.components
data = values[:, :, 0]
time_index = self._time_index
if copy:
data = data.copy()
time_index = time_index.copy()
if time_as_index:
# special path for pandas with index
return pd.DataFrame(data=data, index=time_index, columns=columns)
data = {
time_index.name: time_index, # set time_index as left-most column
**{col: data[:, idx] for idx, col in enumerate(columns)},
}
return nw.from_dict(data, backend=backend).to_native()
[docs] def schema(self, copy: bool = True) -> dict[str, Any]:
"""Return the schema of the series as a dictionary.
Can be used to create new `TimeSeries` with the same schema.
The keys and values are:
- "time_freq": the frequency (or step size) of the time (or range) index
- "time_name": the name of the time index
- "columns": the columns / components
- "static_covariates": the static covariates
- "hierarchy": the hierarchy
- "metadata": the metadata
"""
schema = {
"time_freq": self._freq,
"time_name": self._time_index.name,
"columns": self.components,
STATIC_COV_TAG: self.static_covariates,
HIERARCHY_TAG: self.hierarchy,
METADATA_TAG: self.metadata,
}
if copy:
schema = {k: deepcopy(v) for k, v in schema.items()}
return schema
[docs] def astype(self, dtype: Union[str, np.dtype]) -> Self:
"""Return a new series with the values have been converted to the desired `dtype`.
Parameters
----------
dtype
A NumPy dtype (numpy.float32 or numpy.float64)
Returns
-------
TimeSeries
A series having the desired dtype.
"""
return self.__class__(
times=self._time_index,
values=self._values.astype(dtype),
components=self.components,
**self._attrs,
)
[docs] def start_time(self) -> Union[pd.Timestamp, int]:
"""Start time of the series.
Returns
-------
Union[pandas.Timestamp, int]
A timestamp containing the first time of the TimeSeries (if indexed by DatetimeIndex),
or an integer (if indexed by RangeIndex)
"""
return self._time_index[0]
[docs] def end_time(self) -> Union[pd.Timestamp, int]:
"""End time of the series.
Returns
-------
Union[pandas.Timestamp, int]
A timestamp containing the last time of the TimeSeries (if indexed by DatetimeIndex),
or an integer (if indexed by RangeIndex)
"""
return self._time_index[-1]
[docs] def first_value(self) -> float:
"""First value of the univariate series.
Returns
-------
float
The first value of this univariate deterministic time series
"""
self._assert_univariate()
self._assert_deterministic()
return float(self._values[0, 0, 0])
[docs] def last_value(self) -> float:
"""Last value of the univariate series.
Returns
-------
float
The last value of this univariate deterministic time series
"""
self._assert_univariate()
self._assert_deterministic()
return float(self._values[-1, 0, 0])
[docs] def first_values(self) -> np.ndarray:
"""First values of the potentially multivariate series.
Returns
-------
numpy.ndarray
The first values of every component of this deterministic time series
"""
self._assert_deterministic()
return self._values[0, :, 0].copy()
[docs] def last_values(self) -> np.ndarray:
"""Last values of the potentially multivariate series.
Returns
-------
numpy.ndarray
The last values of every component of this deterministic time series
"""
self._assert_deterministic()
return self._values[-1, :, 0].copy()
[docs] def values(self, copy: bool = True, sample: int = 0) -> np.ndarray:
"""Return a 2-D array of shape (time, component), containing the series' values for one `sample`.
Parameters
----------
copy
Whether to return a copy of the values, otherwise returns a view.
Leave it to True unless you know what you are doing.
sample
For stochastic series, the sample for which to return values. Default: 0 (first sample).
Returns
-------
numpy.ndarray
The values composing the time series.
"""
if self.is_deterministic and sample != 0:
raise_log(
ValueError(
"This series contains one sample only (deterministic),"
"so only sample=0 is accepted.",
),
logger=logger,
)
values = self._values[:, :, sample]
if copy:
values = values.copy()
return values
[docs] def random_component_values(self, copy: bool = True) -> np.array:
"""Return a 2-D array of shape (time, component), containing the series' values for one sample taken uniformly
at random from all samples.
Parameters
----------
copy
Whether to return a copy of the values, otherwise returns a view.
Leave it to True unless you know what you are doing.
Returns
-------
numpy.ndarray
The values composing one sample taken at random from the time series.
"""
sample = np.random.randint(low=0, high=self.n_samples)
values = self._values[:, :, sample]
if copy:
values = values.copy()
return values
[docs] def all_values(self, copy: bool = True) -> np.ndarray:
"""Return a 3-D array of dimension (time, component, sample) containing the series' values for all samples.
Parameters
----------
copy
Whether to return a copy of the values, otherwise returns a view.
Leave it to True unless you know what you are doing.
Returns
-------
numpy.ndarray
The values composing the time series.
"""
values = self._values
if copy:
values = values.copy()
return values
[docs] def univariate_values(self, copy: bool = True, sample: int = 0) -> np.ndarray:
"""Return a 1-D Numpy array of shape (time,) containing the univariate series' values for one `sample`.
Parameters
----------
copy
Whether to return a copy of the values. Leave it to True unless you know what you are doing.
sample
For stochastic series, the sample for which to return values. Default: 0 (first sample).
Returns
-------
numpy.ndarray
The values composing the time series guaranteed to be univariate.
"""
self._assert_univariate()
values = self._values[:, 0, sample]
if copy:
values = values.copy()
return values
[docs] def static_covariates_values(self, copy: bool = True) -> Optional[np.ndarray]:
"""Return a 2-D array of dimension (component, static variable) containing the series' static covariate values.
Parameters
----------
copy
Whether to return a copy of the values, otherwise returns a view.
Can only return a view if all values have the same dtype.
Leave it to True unless you know what you are doing.
Returns
-------
Optional[numpy.ndarray]
The static covariate values if the series has static covariates, else `None`.
"""
return (
self.static_covariates.to_numpy(copy=copy)
if self.has_static_covariates
else self.static_covariates
)
[docs] def head(
self, size: Optional[int] = 5, axis: Optional[Union[int, str]] = 0
) -> Self:
"""Return a new series with the first `size` points.
Parameters
----------
size : int, default 5
number of points to retain
axis : str or int, optional, default: 0
axis along which to slice the series
Returns
-------
TimeSeries
The series made of the first `size` points along the desired `axis`.
"""
axis = self._get_axis(axis)
display_n = min(size, self.shape[axis])
if axis == TIME_AX:
return self[:display_n]
elif axis == COMP_AX:
return self[self.components.tolist()[:display_n]]
else:
return self.__class__(
times=self._time_index,
values=self._values[:, :, :display_n],
components=self.components,
**self._attrs,
)
[docs] def tail(
self, size: Optional[int] = 5, axis: Optional[Union[int, str]] = 0
) -> Self:
"""Return a new series with the last `size` points.
Parameters
----------
size : int, default: 5
number of points to retain
axis : str or int, optional, default: 0 (time dimension)
axis along which we intend to display records
Returns
-------
TimeSeries
The series made of the last `size` points along the desired `axis`.
"""
axis = self._get_axis(axis)
display_n = min(size, self.shape[axis])
if axis == TIME_AX:
return self[-display_n:]
elif axis == COMP_AX:
return self[self.components.tolist()[-display_n:]]
else:
return self.__class__(
times=self._time_index,
values=self._values[:, :, -display_n:],
components=self.components,
**self._attrs,
)
[docs] def concatenate(
self,
other: Self,
axis: Optional[Union[str, int]] = 0,
ignore_time_axis: Optional[bool] = False,
ignore_static_covariates: bool = False,
drop_hierarchy: bool = True,
drop_metadata: bool = False,
) -> Self:
"""Return a new series where this series is concatenated with the `other` series along a given `axis`.
Parameters
----------
other : TimeSeries
another timeseries to concatenate to this one
axis : str or int
axis along which timeseries will be concatenated. ['time', 'component' or 'sample'; Default: 0 (time)]
ignore_time_axis : bool, default False
Ignore errors when time axis varies for some timeseries. Note that this may yield unexpected results
ignore_static_covariates : bool
whether to ignore all requirements for static covariate concatenation and only transfer the
static covariates of the current (`self`) timeseries to the concatenated timeseries.
Only effective when `axis=1`.
drop_hierarchy : bool
When `axis=1`, whether to drop hierarchy information. True by default.
When False, the hierarchies will be "concatenated" as well
(by merging the hierarchy dictionaries), which may cause issues if the component
names of the resulting series and that of the merged hierarchy do not match.
When `axis=0` or `axis=2`, the hierarchy of the first series is always kept.
drop_metadata : bool
Whether to drop the metadata information of the concatenated timeseries. False by default.
When False, the concatenated series will inherit the metadata from the current (`self`) timeseries.
Returns
-------
TimeSeries
The concatenated series.
See Also
--------
concatenate : a function to concatenate multiple series along a given axis.
Notes
-----
When concatenating along the `time` dimension, the current series marks the start date of
the resulting series, and the other series will have its time index ignored.
"""
return concatenate(
series=[self, other],
axis=axis,
ignore_time_axis=ignore_time_axis,
ignore_static_covariates=ignore_static_covariates,
drop_hierarchy=drop_hierarchy,
drop_metadata=drop_metadata,
)
"""
Other methods
=============
"""
[docs] def gaps(self, mode: Literal["all", "any"] = "all") -> pd.DataFrame:
"""Compute and return gaps in the series.
Works only on deterministic time series (1 sample).
Parameters
----------
mode
Only relevant for multivariate time series. The mode defines how gaps are defined. Set to
'any' if a NaN value in any columns should be considered as as gaps. 'all' will only
consider periods where all columns' values are NaN. Defaults to 'all'.
Returns
-------
pandas.DataFrame
A pandas.DataFrame containing a row for every gap (rows with all-NaN values in underlying DataFrame)
in this time series. The DataFrame contains three columns that include the start and end time stamps
of the gap and the integer length of the gap (in `self.freq` units if the series is indexed
by a DatetimeIndex).
"""
df = self.to_dataframe()
if mode == "all":
is_nan_series = df.isna().all(axis=1).astype(int)
elif mode == "any":
is_nan_series = df.isna().any(axis=1).astype(int)
else:
raise_log(
ValueError(
f"Keyword mode accepts only 'any' or 'all'. Provided {mode}"
),
logger,
)
diff = pd.Series(np.diff(is_nan_series.values), index=is_nan_series.index[:-1])
gap_starts = diff[diff == 1].index + self._freq
gap_ends = diff[diff == -1].index
if is_nan_series.iloc[0] == 1:
gap_starts = gap_starts.insert(0, self.start_time())
if is_nan_series.iloc[-1] == 1:
gap_ends = gap_ends.insert(len(gap_ends), self.end_time())
gap_df = pd.DataFrame(columns=["gap_start", "gap_end"])
if gap_starts.size == 0:
return gap_df
else:
def intvl(start, end):
if self._has_datetime_index:
return pd.date_range(start=start, end=end, freq=self._freq).size
else:
return int((end - start) / self._freq) + 1
gap_df["gap_start"] = gap_starts
gap_df["gap_end"] = gap_ends
gap_df["gap_size"] = gap_df.apply(
lambda row: intvl(start=row.gap_start, end=row.gap_end), axis=1
)
return gap_df
[docs] def copy(self) -> Self:
"""Create a copy of the series.
Returns
-------
TimeSeries
A copy of the series.
"""
# the data will be copied in the TimeSeries constructor.
return self.__class__(
times=self._time_index,
values=self._values,
components=self.components,
copy=True,
**self._attrs,
)
[docs] def get_index_at_point(
self, point: Union[pd.Timestamp, float, int], after=True
) -> int:
"""Convert a point along the time index into an integer index ranging from (0, len(series)-1) inclusive.
Parameters
----------
point
This parameter supports 3 different data types: ``pandas.Timestamp``, ``float`` and ``int``.
``pandas.Timestamp`` work only on series that are indexed with a ``pandas.DatetimeIndex``. In such cases,
the returned point will be the index of this timestamp if it is present in the series time index.
If it's not present in the time index, the index of the next timestamp is returned if `after=True`
(if it exists in the series), otherwise the index of the previous timestamp is returned
(if it exists in the series).
In case of a ``float``, the parameter will be treated as the proportion of the time series
that should lie before the point.
If an ``int`` and series is datetime-indexed, the value of `point` is returned.
If an ``int`` and series is integer-indexed, the index position of `point` in the RangeIndex is returned
(accounting for steps).
after
If the provided pandas Timestamp is not in the time series index, whether to return the index of the
next timestamp or the index of the previous one.
Returns
-------
int
The index position corresponding to the provided point in the series.
"""
point_index = -1
if isinstance(point, float):
if not 0.0 <= point <= 1.0:
raise_log(
ValueError("point (float) should be between 0.0 and 1.0."), logger
)
point_index = int((len(self) - 1) * point)
elif isinstance(point, (int, np.int64)):
if self.has_datetime_index or (self.start_time() == 0 and self.freq == 1):
point_index = point
else:
point_index_float = (point - self.start_time()) / self.freq
point_index = int(point_index_float)
if point_index != point_index_float:
raise_log(
ValueError(
"The provided point is not a valid index for this series."
),
logger,
)
if not 0 <= point_index < len(self):
raise_log(
ValueError(
f"The index corresponding to the provided point ({point}) should be a valid index in series"
),
logger,
)
elif isinstance(point, pd.Timestamp):
if not self._has_datetime_index:
raise_log(
ValueError(
"A Timestamp has been provided, but this series is not time-indexed."
),
logger,
)
self._raise_if_not_within(point)
if point in self:
point_index = self._time_index.get_loc(point)
else:
point_index = self._time_index.get_loc(
self._get_first_timestamp_after(point)
if after
else self._get_last_timestamp_before(point)
)
else:
raise_log(
TypeError(
"`point` needs to be either `float`, `int` or `pandas.Timestamp`"
),
logger,
)
return point_index
[docs] def get_timestamp_at_point(
self, point: Union[pd.Timestamp, float, int]
) -> Union[pd.Timestamp, int]:
"""Convert a point into a ``pandas.Timestamp`` (if datetime-indexed) or integer (if integer-indexed).
Parameters
----------
point
This parameter supports 3 different data types: `float`, `int` and `pandas.Timestamp`.
In case of a `float`, the parameter will be treated as the proportion of the time series
that should lie before the point.
In case of `int`, the parameter will be treated as an integer index to the time index of
`series`. Will raise a ValueError if not a valid index in `series`.
In case of a `pandas.Timestamp`, point will be returned as is provided that the timestamp
is present in the series time index, otherwise will raise a ValueError.
Returns
-------
Union[pandas.Timestamp, int]
The index value corresponding to the provided point in the series.
If the series is indexed by a `pandas.DatetimeIndex`, returns a `pandas.Timestamp`.
If the series is indexed by a `pandas.RangeIndex`, returns an integer.
"""
idx = self.get_index_at_point(point)
return self._time_index[idx]
def _split_at(
self, split_point: Union[pd.Timestamp, float, int], after: bool = True
) -> tuple[Self, Self]:
# Get index with not after in order to avoid moving twice if split_point is not in self
point_index = self.get_index_at_point(split_point, not after)
return (
self[: point_index + (1 if after else 0)],
self[point_index + (1 if after else 0) :],
)
[docs] def split_after(
self, split_point: Union[pd.Timestamp, float, int]
) -> tuple[Self, Self]:
"""Split the series in two, after a provided `split_point`.
Parameters
----------
split_point
A timestamp, float or integer. If float, represents the proportion of the series to include in the
first TimeSeries (must be between 0.0 and 1.0). If integer, represents the index position after
which the split is performed. A pandas.Timestamp can be provided for TimeSeries that are indexed by a
pandas.DatetimeIndex. In such cases, the timestamp will be contained in the first TimeSeries, but not
in the second one. The timestamp itself does not have to appear in the original TimeSeries index.
Returns
-------
Tuple[TimeSeries, TimeSeries]
A tuple of two series. The first time series contains the first entries up to the `split_point` (inclusive),
and the second contains the remaining ones.
"""
return self._split_at(split_point, after=True)
[docs] def split_before(
self, split_point: Union[pd.Timestamp, float, int]
) -> tuple[Self, Self]:
"""Split the series in two, before a provided `split_point`.
Parameters
----------
split_point
A timestamp, float or integer. If float, represents the proportion of the series to include in the
first TimeSeries (must be between 0.0 and 1.0). If integer, represents the index position before
which the split is performed. A pandas.Timestamp can be provided for TimeSeries that are indexed by a
pandas.DatetimeIndex. In such cases, the timestamp will be contained in the second TimeSeries, but not
in the first one. The timestamp itself does not have to appear in the original TimeSeries index.
Returns
-------
Tuple[TimeSeries, TimeSeries]
A tuple of two series. The first time series contains the first entries up to the `split_point` (exclusive),
and the second contains the remaining ones.
"""
return self._split_at(split_point, after=False)
[docs] def drop_after(self, split_point: Union[pd.Timestamp, float, int]):
"""Return a new series where everything after and including the provided time `split_point` was dropped.
The timestamp may not be in the series. If it is, the timestamp will be dropped.
Parameters
----------
split_point
The timestamp that indicates cut-off time.
Returns
-------
TimeSeries
A series that contains all entries until `split_point` (exclusive).
"""
return self[: self.get_index_at_point(split_point, after=True)]
[docs] def drop_before(self, split_point: Union[pd.Timestamp, float, int]):
"""Return a new series where everything before and including the provided time `split_point` was dropped.
The timestamp may not be in the series. If it is, the timestamp will be dropped.
Parameters
----------
split_point
The timestamp that indicates cut-off time.
Returns
-------
TimeSeries
A series that contains all entries starting after `split_point` (exclusive).
"""
return self[self.get_index_at_point(split_point, after=False) + 1 :]
[docs] def slice(
self, start_ts: Union[pd.Timestamp, int], end_ts: Union[pd.Timestamp, int]
):
"""Return a slice of the series starting at `start_ts` and ending before `end_ts`.
For series having DatetimeIndex, this is inclusive on both ends. For series having a RangeIndex,
`end_ts` is exclusive.
`start_ts` and `end_ts` don't have to be in the series.
Parameters
----------
start_ts
The timestamp that indicates the left cut-off.
end_ts
The timestamp that indicates the right cut-off.
Returns
-------
TimeSeries
A new series, with indices greater or equal than `start_ts` and smaller or equal than `end_ts`.
"""
if type(start_ts) is not type(end_ts):
raise_log(
ValueError(
"The two timestamps provided to slice() have to be of the same type."
),
logger,
)
if isinstance(start_ts, pd.Timestamp):
if not self._has_datetime_index:
raise_log(
ValueError(
"Timestamps have been provided to slice(), but the series is "
"indexed using an integer-based RangeIndex."
),
logger,
)
if start_ts in self._time_index and end_ts in self._time_index:
return self[
start_ts:end_ts
] # we assume this is faster than the filtering below
else:
idx = self._time_index[
(start_ts <= self._time_index) & (self._time_index <= end_ts)
]
return self[idx]
else:
if self._has_datetime_index:
raise_log(
ValueError(
"start and end times have been provided as integers to slice(), but "
"the series is indexed with a DatetimeIndex."
),
logger,
)
# get closest timestamp if either start or end are not in the index
effective_start_ts = (
min(self._time_index, key=lambda t: abs(t - start_ts))
if start_ts not in self._time_index
else start_ts
)
if effective_start_ts < start_ts:
# if the requested start_ts is smaller than the start argument,
# we have to increase it to be consistent with the docstring
effective_start_ts += self.freq
effective_end_ts = (
min(self._time_index, key=lambda t: abs(t - end_ts))
if end_ts not in self._time_index
else end_ts
)
if end_ts >= effective_end_ts + self.freq:
# if the requested end_ts is further off from the end of the time series,
# we have to increase effective_end_ts to make the last timestamp inclusive.
effective_end_ts += self.freq
idx = pd.RangeIndex(effective_start_ts, effective_end_ts, step=self.freq)
return self[idx]
[docs] def slice_n_points_after(self, start_ts: Union[pd.Timestamp, int], n: int) -> Self:
"""Return a slice of the series starting at `start_ts` (inclusive) and having at most `n` points.
Parameters
----------
start_ts
The timestamp or index that indicates the splitting time.
n
The maximal length of the new TimeSeries.
Returns
-------
TimeSeries
A new series, with length at most `n`, starting at `start_ts`.
"""
if n <= 0:
raise_log(ValueError("n should be a positive integer."), logger)
self._raise_if_not_within(start_ts)
if isinstance(start_ts, (int, np.int64)):
return self[pd.RangeIndex(start=start_ts, stop=start_ts + n)]
elif isinstance(start_ts, pd.Timestamp):
# get first timestamp greater or equal to start_ts
tss = self._get_first_timestamp_after(start_ts)
point_index = self.get_index_at_point(tss)
return self[point_index : point_index + n]
else:
raise_log(
ValueError("start_ts must be an int or a pandas Timestamp."), logger
)
[docs] def slice_n_points_before(self, end_ts: Union[pd.Timestamp, int], n: int) -> Self:
"""Return a slice of the series ending at `end_ts` (inclusive) and having at most `n` points.
Parameters
----------
end_ts
The timestamp or index that indicates the splitting time.
n
The maximal length of the new TimeSeries.
Returns
-------
TimeSeries
A new series, with length at most `n`, ending at `start_ts`.
"""
if n <= 0:
raise_log(ValueError("n should be a positive integer."), logger)
self._raise_if_not_within(end_ts)
if isinstance(end_ts, (int, np.int64)):
return self[pd.RangeIndex(start=end_ts - n + 1, stop=end_ts + 1)]
elif isinstance(end_ts, pd.Timestamp):
# get last timestamp smaller or equal to start_ts
tss = self._get_last_timestamp_before(end_ts)
point_index = self.get_index_at_point(tss)
return self[max(0, point_index - n + 1) : point_index + 1]
else:
raise_log(
ValueError("start_ts must be an int or a pandas Timestamp."), logger
)
[docs] def slice_intersect(self, other: Self) -> Self:
"""Return a slice of the series where the time index was intersected with the `other` series.
This method is in general *not* symmetric.
Parameters
----------
other
the other time series
Returns
-------
TimeSeries
A new series, containing the values of this series, over the time-span common to both series.
"""
if other.has_same_time_as(self):
return self.copy()
elif other.freq == self.freq and len(self) and len(other):
start, end = self._slice_intersect_bounds(other)
return self[start:end]
else:
time_index = self.time_index.intersection(other.time_index)
return self[time_index]
[docs] def slice_intersect_values(self, other: Self, copy: bool = False) -> np.ndarray:
"""Return the sliced values of the series where the time index was intersected with the `other` series.
This method is in general *not* symmetric.
Parameters
----------
other
The other time series
copy
Whether to return a copy of the values, otherwise returns a view.
Leave it to True unless you know what you are doing.
Returns
-------
numpy.ndarray
The values of this series, over the time-span common to both series.
"""
vals = self.all_values(copy=copy)
if other.has_same_time_as(self):
return vals
if other.freq == self.freq:
start, end = self._slice_intersect_bounds(other)
return vals[start:end]
else:
return vals[self._time_index.isin(other._time_index)]
[docs] def slice_intersect_times(
self, other: Self, copy: bool = True
) -> Union[pd.DatetimeIndex, pd.RangeIndex]:
"""Return the time index of the series where the time index was intersected with the `other` series.
This method is in general *not* symmetric.
Parameters
----------
other
The other time series
copy
Whether to return a copy of the time index, otherwise returns a view. Leave it to True unless you know
what you are doing.
Returns
-------
Union[pandas.DatetimeIndex, pandas.RangeIndex]
The time index of this series, over the time-span common to both series.
"""
time_index = self.time_index if copy else self._time_index
if other.has_same_time_as(self):
return time_index
if other.freq == self.freq:
start, end = self._slice_intersect_bounds(other)
return time_index[start:end]
else:
return time_index[time_index.isin(other._time_index)]
def _slice_intersect_bounds(self, other: Self) -> tuple[int, int]:
"""Find the start (absolute index) and end (index relative to the end) indices that represent the time
intersection from `self` and `other`."""
shift_start = n_steps_between(
other.start_time(), self.start_time(), freq=self.freq
)
shift_end = len(other) - (len(self) - shift_start)
shift_start = shift_start if shift_start >= 0 else 0
shift_end = shift_end if shift_end < 0 else None
return shift_start, shift_end
[docs] def strip(self, how: str = "all") -> Self:
"""Return a slice of the deterministic time series where NaN-containing entries at the beginning and the end
were removed.
No entries after (and including) the first non-NaN entry and before (and including) the last non-NaN entry are
removed.
This method is only applicable to deterministic series (i.e., having 1 sample).
Parameters
----------
how
Define if the entries containing `NaN` in all the components ('all') or in any of the components ('any')
should be stripped. Default: 'all'
Returns
-------
TimeSeries
A new series where NaN-containing entries at start and end were removed.
"""
if self.is_probabilistic:
raise_log(
ValueError("`strip` cannot be applied to stochastic TimeSeries"), logger
)
first_finite_row, last_finite_row = _finite_rows_boundaries(
self.values(copy=False), how=how
)
return self.__class__(
times=self._time_index[first_finite_row : last_finite_row + 1],
values=self._values[first_finite_row : last_finite_row + 1],
components=self.components,
**self._attrs,
)
[docs] def longest_contiguous_slice(
self, max_gap_size: int = 0, mode: str = "all"
) -> Self:
"""Return the largest slice of the deterministic series without any gaps (contiguous all-NaN value entries)
larger than `max_gap_size`.
This method is only applicable to deterministic series (i.e., having 1 sample).
Parameters
----------
max_gap_size
Indicate the maximum gap size that the series can contain.
mode
Only relevant for multivariate time series. The mode defines how gaps are defined. Set to
'any' if a NaN value in any columns should be considered as as gaps. 'all' will only
consider periods where all columns' values are NaN. Defaults to 'all'.
Returns
-------
TimeSeries
A new series with the largest slice of the original that has no gaps longer than `max_gap_size`.
See Also
--------
TimeSeries.gaps : return the gaps in the TimeSeries
"""
if not (np.isnan(self._values)).any():
return self.copy()
stripped_series = self.strip()
gaps = stripped_series.gaps(mode=mode)
relevant_gaps = gaps[gaps["gap_size"] > max_gap_size]
curr_slice_start = stripped_series.start_time()
max_size = pd.Timedelta(days=0) if self._has_datetime_index else 0
max_slice_start = None
max_slice_end = None
for index, row in relevant_gaps.iterrows():
# evaluate size of the current slice. the slice ends one time step before row['gap_start']
curr_slice_end = row["gap_start"] - self.freq
size = curr_slice_end - curr_slice_start
if size > max_size:
max_size = size
max_slice_start = curr_slice_start
max_slice_end = row["gap_start"] - self._freq
curr_slice_start = row["gap_end"] + self._freq
if stripped_series.end_time() - curr_slice_start > max_size:
max_slice_start = curr_slice_start
max_slice_end = self.end_time()
return stripped_series[max_slice_start:max_slice_end]
[docs] def rescale_with_value(self, value_at_first_step: float) -> Self:
"""Return a new series, which is a multiple of this series such that the first value is `value_at_first_step`.
Note: Numerical errors can appear with `value_at_first_step > 1e+24`.
Parameters
----------
value_at_first_step
The new value for the first entry of the TimeSeries.
Returns
-------
TimeSeries
A new series, where the first value is `value_at_first_step` and other values have been scaled accordingly.
"""
if (self._values[0, :, :] == 0).any():
raise_log(ValueError("Cannot rescale with first value `0`."), logger)
coef = value_at_first_step / self._values[:1]
return self.__class__(
times=self._time_index,
values=self._values * coef,
components=self.components,
**self._attrs,
)
[docs] def shift(self, n: int) -> Self:
"""Return a new series where the time index was shifted by `n` steps.
If :math:`n > 0`, shifts into the future. If :math:`n < 0`, shifts into the past.
For example, with :math:`n=2` and `freq='M'`, March 2013 becomes May 2013.
With :math:`n=-2`, March 2013 becomes Jan 2013.
Parameters
----------
n
The number of time steps (in self.freq unit) to shift by. Can be negative.
Returns
-------
TimeSeries
A new series, with a shifted time index.
"""
if not isinstance(n, (int, np.int64)):
logger.warning(
f"TimeSeries.shift(): converting n to int from {n} to {int(n)}"
)
n = int(n)
try:
self._time_index[-1] + n * self.freq
except pd.errors.OutOfBoundsDatetime:
raise_log(
OverflowError(
f"the add operation between {n * self.freq} and {self.time_index[-1]} will "
"overflow"
),
logger,
)
if self.has_range_index:
new_time_index = self._time_index + n * self.freq
else:
new_time_index = self._time_index.map(lambda ts: ts + n * self.freq)
if new_time_index.freq is None:
new_time_index.freq = self.freq
return self.__class__(
times=new_time_index,
values=self._values,
components=self.components,
**self._attrs,
)
[docs] def diff(
self,
n: Optional[int] = 1,
periods: Optional[int] = 1,
dropna: Optional[bool] = True,
) -> Self:
"""Return a new series with differenced values.
This is often used to make a time series stationary.
Parameters
----------
n
Optionally, a positive integer indicating the number of differencing steps (default = 1).
For instance, n=2 computes the second order differences.
periods
Optionally, periods to shift for calculating difference. For instance, periods=12 computes the
difference between values at time `t` and times `t-12`.
dropna
Whether to drop the missing values after each differencing steps. If set to `False`, the corresponding
first `periods` time steps will be filled with NaNs.
Returns
-------
TimeSeries
A new series, with the differenced values.
"""
if not isinstance(n, int) or n < 1:
raise_log(ValueError("'n' must be a positive integer >= 1."), logger)
if not isinstance(periods, int) or periods < 1:
raise_log(ValueError("'periods' must be an integer >= 1."), logger)
def _compute_diff(values_: np.ndarray, times_):
if not dropna:
# In this case the new DataArray will have the same size and filled with NaNs
values_diff = values_.copy()
values_diff[:periods, :, :] = np.nan
values_diff[periods:, :, :] = (
values_[periods:, :, :] - values_[:-periods, :, :]
)
else:
# In this case the new DataArray will be shorter
times_ = times_[periods:]
values_diff = values_[periods:, :, :].copy()
values_diff[:] = values_[periods:, :, :] - values_[:-periods, :, :]
return values_diff, times_
values, times = _compute_diff(self._values, self._time_index)
for _ in range(n - 1):
values, times = _compute_diff(values, times)
return self.__class__(
times=times,
values=values,
components=self.components,
**self._attrs,
)
[docs] def cumsum(self) -> Self:
"""Return a new series with the cumulative sum along the time axis.
Returns
-------
TimeSeries
A new series, with the cumulatively summed values.
"""
return self.__class__(
times=self._time_index,
values=self._values.cumsum(axis=0),
components=self.components,
**self._attrs,
)
[docs] def has_same_time_as(self, other: Self) -> bool:
"""Whether the series has the same time index as the `other` series.
Parameters
----------
other
the other series
Returns
-------
bool
`True` if both series have the same index, `False` otherwise.
"""
if len(other) != len(self):
return False
elif other.freq != self.freq:
return False
elif other.start_time() != self.start_time():
return False
else:
return True
[docs] def append(self, other: Self) -> Self:
"""Return a new series with the `other` series appended to this series along the time axis (added to the end).
Parameters
----------
other
A second TimeSeries.
Returns
-------
TimeSeries
A new series, obtained by appending the second series to the first.
See Also
--------
TimeSeries.concatenate : concatenate another series along a given axis.
TimeSeries.prepend : prepend another series along the time axis.
"""
if other.has_datetime_index != self.has_datetime_index:
raise_log(
ValueError(
"Both series must have the same type of time index (either DatetimeIndex or RangeIndex)."
),
logger,
)
if other.freq != self.freq:
raise_log(ValueError("Both series must have the same frequency."), logger)
if other.n_components != self.n_components:
raise_log(
ValueError("Both series must have the same number of components."),
logger,
)
if other.n_samples != self.n_samples:
raise_log(
ValueError("Both series must have the same number of samples."), logger
)
if len(self) > 0 and len(other) > 0:
if other.start_time() != self.end_time() + self.freq:
raise_log(
ValueError(
"Appended TimeSeries must start one (time) step after current one."
),
logger,
)
values = np.concatenate((self._values, other._values), axis=0)
times = self._time_index.append(other._time_index)
return self.__class__(
times=times,
values=values,
components=self.components,
**self._attrs,
)
[docs] def append_values(self, values: np.ndarray) -> Self:
"""Return a new series with `values` appended to this series along the time axis (added to the end).
This adds time steps to the end of the new series.
Parameters
----------
values
An array with the values to append.
Returns
-------
TimeSeries
A new series with the new values appended.
See Also
--------
TimeSeries.prepend_values : prepend the values of another series along the time axis.
"""
if len(values) == 0:
return self.copy()
values = np.array(values) if not isinstance(values, np.ndarray) else values
values = expand_arr(values, ndim=len(DIMS))
if not values.shape[1:] == self.shape[1:]:
raise_log(
ValueError(
f"The (expanded) values must have the same number of components and samples "
f"(second and third dims) as the series to append to. "
f"Received shape: {values.shape}, expected: {self.shape}"
),
logger=logger,
)
idx = generate_index(
start=self.end_time() + self.freq,
length=len(values),
freq=self.freq,
name=self._time_index.name,
)
return self.append(
self.__class__(
values=values, times=idx, components=self.components, **self._attrs
)
)
[docs] def prepend(self, other: Self) -> Self:
"""Return a new series with the `other` series prepended to this series along the time axis (added to the
beginning).
Parameters
----------
other
A second TimeSeries.
Returns
-------
TimeSeries
A new series, obtained by prepending the second series to the first.
See Also
--------
TimeSeries.concatenate : concatenate another series along a given axis.
TimeSeries.append : append another series along the time axis.
"""
if not isinstance(other, self.__class__):
raise_log(
ValueError(
f"`other` to prepend must be a {self.__class__.__name__} object."
),
logger,
)
return other.append(self)
[docs] def prepend_values(self, values: np.ndarray) -> Self:
"""Return a new series with `values` prepended to this series along the time axis (added to the beginning).
This adds time steps to the beginning of the new series.
Parameters
----------
values
An array with the values to prepend to the start.
Returns
-------
TimeSeries
A new series with the new values prepended.
See Also
--------
TimeSeries.append_values : append the values of another series along the time axis.
"""
if len(values) == 0:
return self.copy()
values = np.array(values) if not isinstance(values, np.ndarray) else values
values = expand_arr(values, ndim=len(DIMS))
if not values.shape[1:] == self._values.shape[1:]:
raise_log(
ValueError(
f"The (expanded) values must have the same number of components and samples "
f"(second and third dims) as the series to prepend to. "
f"Received shape: {values.shape}, expected: {self._values.shape}"
),
logger=logger,
)
idx = generate_index(
end=self.start_time() - self.freq,
length=len(values),
freq=self.freq,
name=self._time_index.name,
)
return self.prepend(
self.__class__(
times=idx,
values=values,
components=self.columns,
**self._attrs,
)
)
[docs] def with_times_and_values(
self,
times: Union[pd.DatetimeIndex, pd.RangeIndex, pd.Index],
values: np.ndarray,
fill_missing_dates: Optional[bool] = False,
freq: Optional[Union[str, int]] = None,
fillna_value: Optional[float] = None,
) -> Self:
"""Return a new series similar to this one but with new `times` and `values`.
Parameters
----------
times
A pandas DateTimeIndex, RangeIndex (or Index that can be converted to a RangeIndex) representing the new
time axis for the time series. It is better if the index has no holes; alternatively setting
`fill_missing_dates` can in some cases solve these issues (filling holes with NaN, or with the provided
`fillna_value` numeric value, if any).
values
A Numpy array with new values. It must have the dimensions for `times` and components, but may contain a
different number of samples.
fill_missing_dates
Optionally, a boolean value indicating whether to fill missing dates (or indices in case of integer index)
with NaN values. This requires either a provided `freq` or the possibility to infer the frequency from the
provided timestamps. See :meth:`_fill_missing_dates() <TimeSeries._fill_missing_dates>` for more info.
freq
Optionally, a string or integer representing the frequency of the underlying index. This is useful in order
to fill in missing values if some dates are missing and `fill_missing_dates` is set to `True`.
If a string, represents the frequency of the pandas DatetimeIndex (see `offset aliases
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_ for more info on
supported frequencies).
If an integer, represents the step size of the pandas Index or pandas RangeIndex.
fillna_value
Optionally, a numeric value to fill missing values (NaNs) with.
Returns
-------
TimeSeries
A new series with the new time index and values but identical static covariates and hierarchy.
"""
values = np.array(values) if not isinstance(values, np.ndarray) else values
values = expand_arr(values, ndim=len(DIMS))
if values.shape[1] != self.shape[1]:
raise_log(
ValueError(
"The new values must have the same number of components as the present series. "
f"Received: {values.shape[1]}, expected: {self.shape[1]}"
),
logger,
)
return self.__class__(
times=times,
values=values,
fill_missing_dates=fill_missing_dates,
freq=freq,
components=self.components,
fillna_value=fillna_value,
**self._attrs,
)
[docs] def with_values(self, values: np.ndarray) -> Self:
"""Return a new series similar to this one but with new `values`.
Parameters
----------
values
A Numpy array with new values. It must have the dimensions for time
and components, but may contain a different number of samples.
Returns
-------
TimeSeries
A new series with the new values but same index, static covariates and hierarchy
"""
values = np.array(values) if not isinstance(values, np.ndarray) else values
values = expand_arr(values, ndim=len(DIMS))
if values.shape[:2] != self.shape[:2]:
raise_log(
ValueError(
"The new values must have the same shape (time, components) as the present series. "
f"Received: {values.shape[:2]}, expected: {self.shape[:2]}"
),
logger,
)
return self.__class__(
times=self._time_index,
values=values,
components=self.components,
**self._attrs,
)
[docs] def with_static_covariates(
self, covariates: Optional[Union[pd.Series, pd.DataFrame]]
) -> Self:
"""Return a new series with added static covariates.
Static covariates hold information / data about the time series which does not vary over time.
Parameters
----------
covariates
Optionally, a set of static covariates to be added to the TimeSeries. Either a pandas Series, a pandas
DataFrame, or `None`. If `None`, will set the static covariates to `None`. If a Series, the index
represents the static variables. The covariates are then globally 'applied' to all components of the
TimeSeries. If a DataFrame, the columns represent the static variables and the rows represent the
components of the uni/multivariate TimeSeries. If a single-row DataFrame, the covariates are globally
'applied' to all components of the TimeSeries. If a multi-row DataFrame, the number of rows must match the
number of components of the TimeSeries. This adds component-specific static covariates.
Returns
-------
TimeSeries
A new series with the given static covariates.
Notes
-----
If there are a large number of static covariates variables (i.e., the static covariates have a very large
dimension), there might be a noticeable performance penalty for creating ``TimeSeries``, unless the covariates
already have the same ``dtype`` as the series data.
Examples
--------
>>> import pandas as pd
>>> from darts.utils.timeseries_generation import linear_timeseries
>>> # add global static covariates
>>> static_covs = pd.Series([0., 1.], index=["static_cov_1", "static_cov_2"])
>>> series = linear_timeseries(length=3)
>>> series_new1 = series.with_static_covariates(static_covs)
>>> series_new1.static_covariates
static_cov_1 static_cov_2
component
linear 0.0 1.0
>>> # add component specific static covariates
>>> static_covs_multi = pd.DataFrame([[0., 1.], [2., 3.]], columns=["static_cov_1", "static_cov_2"])
>>> series_multi = series.stack(series)
>>> series_new2 = series_multi.with_static_covariates(static_covs_multi)
>>> series_new2.static_covariates
static_cov_1 static_cov_2
component
linear 0.0 1.0
linear_1 2.0 3.0
"""
return self.__class__(
times=self._time_index,
values=self._values,
components=self.components,
static_covariates=covariates,
hierarchy=self.hierarchy,
metadata=self.metadata,
)
[docs] def with_hierarchy(self, hierarchy: dict[str, Union[str, list[str]]]) -> Self:
"""Return a new series with added hierarchy.
Parameters
----------
hierarchy
A dictionary mapping components to a list of their parent(s) in the hierarchy.
Single parents may be specified as string or list containing one string.
For example, assume the series contains the components
``["total", "a", "b", "x", "y", "ax", "ay", "bx", "by"]``,
the following dictionary would encode the groupings shown on
`this figure <https://otexts.com/fpp3/hts.html#fig:GroupTree>`_:
.. highlight:: python
.. code-block:: python
hierarchy = {'ax': ['a', 'x'],
'ay': ['a', 'y'],
'bx': ['b', 'x'],
'by': ['b', 'y'],
'a': ['total'],
'b': ['total'],
'x': 'total', # or use a single string
'y': 'total'}
..
Returns
-------
TimeSeries
A new series with the given hierarchy.
"""
return self.__class__(
times=self._time_index,
values=self._values,
components=self.components,
static_covariates=self.static_covariates,
hierarchy=hierarchy,
metadata=self.metadata,
)
[docs] def stack(self, other: Self) -> Self:
"""Return a new series with the `other` series stacked to this series along the component axis.
The resulting TimeSeries will have the same name for its time dimension as this TimeSeries, and the
same number of samples.
Parameters
----------
other
A TimeSeries instance with the same index and the same number of samples as the current one.
Returns
-------
TimeSeries
A new series with the components of the other series added to the original.
"""
return concatenate([self, other], axis=1)
[docs] def drop_columns(self, col_names: Union[list[str], str]) -> Self:
"""Return a new series with dropped components (columns).
Parameters
-------
col_names
String or list of strings corresponding to the columns to be dropped.
Returns
-------
TimeSeries
A new series with the specified columns dropped.
"""
if isinstance(col_names, str):
col_names = [col_names]
comp_list = self.components.tolist()
if not all([x in comp_list for x in col_names]):
raise_log(
ValueError(
"Some column names in `col_names` don't exist in the time series."
),
logger,
)
indexer = []
for idx, col in enumerate(comp_list):
if col not in col_names:
indexer.append(idx)
return self.__class__(
times=self._time_index,
values=self._values[:, indexer],
components=self.components[indexer],
static_covariates=(
self.static_covariates.iloc[indexer]
if self.static_covariates is not None
else None
),
hierarchy=None,
metadata=self.metadata,
)
[docs] def univariate_component(self, index: Union[str, int]) -> Self:
"""Return a new univariate series with a selected component.
This drops the hierarchy (if any), and retains only the relevant static covariates column.
Parameters
----------
index
If a string, the name of the component to retrieve. If an integer, the positional index of the component.
Returns
-------
TimeSeries
A new series with a selected component.
"""
return self[index if isinstance(index, str) else self.components[index]]
[docs] def add_datetime_attribute(
self,
attribute,
one_hot: bool = False,
cyclic: bool = False,
tz: Optional[str] = None,
) -> Self:
"""Return a new series with one (or more) additional component(s) that contain an attribute of the series' time
index.
The additional components are specified with `attribute`, such as 'weekday', 'day' or 'month'.
This works only for deterministic time series (i.e., made of 1 sample).
Notes
-----
0-indexing is enforced across all the encodings, see
:meth:`datetime_attribute_timeseries() <darts.utils.timeseries_generation.datetime_attribute_timeseries>`
for more information.
Parameters
----------
attribute
A pandas.DatatimeIndex attribute which will serve as the basis of the new column(s).
one_hot
Boolean value indicating whether to add the specified attribute as a one hot encoding
(results in more columns).
cyclic
Boolean value indicating whether to add the specified attribute as a cyclic encoding.
Alternative to one_hot encoding, enable only one of the two.
(adds 2 columns, corresponding to sin and cos transformation).
tz
Optionally, a time zone to convert the time index to before computing the attributes.
Returns
-------
TimeSeries
A new series with an added datetime attribute component(s).
"""
self._assert_deterministic()
from darts.utils import timeseries_generation as tg
return self.stack(
tg.datetime_attribute_timeseries(
self.time_index,
attribute=attribute,
one_hot=one_hot,
cyclic=cyclic,
tz=tz,
)
)
[docs] def add_holidays(
self,
country_code: str,
prov: str = None,
state: str = None,
tz: Optional[str] = None,
) -> Self:
"""Return a new series with an added holiday component.
The holiday component is binary where `1` corresponds to a time step falling on a holiday.
Available countries can be found `here <https://github.com/dr-prodigy/python-holidays#available-countries>`_.
This works only for deterministic time series (i.e., made of 1 sample).
Parameters
----------
country_code
The country ISO code
prov
The province
state
The state
tz
Optionally, a time zone to convert the time index to before computing the attributes.
Returns
-------
TimeSeries
A new series with an added holiday component.
"""
self._assert_deterministic()
from darts.utils import timeseries_generation as tg
return self.stack(
tg.holidays_timeseries(
self.time_index,
country_code=country_code,
prov=prov,
state=state,
tz=tz,
)
)
[docs] def resample(
self,
freq: Union[str, pd.DateOffset],
method: str = "pad",
method_kwargs: Optional[dict[str, Any]] = None,
**kwargs,
) -> Self:
"""Return a new series where the time index and values were resampled with a given frequency.
The provided `method` is used to aggregate/fill holes in the resampled series, by default 'pad'.
Parameters
----------
freq
The new time difference between two adjacent entries in the returned TimeSeries.
Expects a `pandas.DateOffset` or `DateOffset` alias.
method
A method to either aggregate grouped values (for down-sampling) or fill holes (for up-sampling)
in the reindexed TimeSeries. For more information, see the `xarray DataArrayResample documentation
<https://docs.xarray.dev/en/stable/generated/xarray.core.resample.DataArrayResample.html>`_.
Supported methods: ["all", "any", "asfreq", "backfill", "bfill", "count", "ffill", "first", "interpolate",
"last", "max", "mean", "median", "min", "nearest", "pad", "prod", "quantile", "reduce", "std", "sum",
"var"].
method_kwargs
Additional keyword arguments for the specified `method`. Some methods require additional arguments.
Xarray's errors will be raised on invalid keyword arguments.
kwargs
some keyword arguments for the `xarray.resample` method, notably `offset` or `base` to indicate where
to start the resampling and avoid nan at the first value of the resampled TimeSeries
For more information, see the `xarray resample() documentation
<https://docs.xarray.dev/en/stable/generated/xarray.DataArray.resample.html>`_.
Returns
-------
TimeSeries
A resampled series with given frequency.
Examples
--------
>>> times = pd.date_range(start=pd.Timestamp("20200101233000"), periods=6, freq="15min")
>>> pd_series = pd.Series(range(6), index=times)
>>> ts = TimeSeries.from_series(pd_series)
>>> print(ts.time_index)
DatetimeIndex(['2020-01-01 23:30:00', '2020-01-01 23:45:00',
'2020-01-02 00:00:00', '2020-01-02 00:15:00',
'2020-01-02 00:30:00', '2020-01-02 00:45:00'],
dtype='datetime64[ns]', name='time', freq='15T')
>>> resampled_nokwargs_ts = ts.resample(freq="1h")
>>> print(resampled_nokwargs_ts.time_index)
DatetimeIndex(['2020-01-01 23:00:00', '2020-01-02 00:00:00'],
dtype='datetime64[ns]', name='time', freq='H')
>>> print(resampled_nokwargs_ts.values())
[[nan]
[ 2.]]
>>> resampled_ts = ts.resample(freq="1h", offset=pd.Timedelta("30min"))
>>> print(resampled_ts.time_index)
DatetimeIndex(['2020-01-01 23:30:00', '2020-01-02 00:30:00'],
dtype='datetime64[ns]', name='time', freq='H')
>>> print(resampled_ts.values())
[[0.]
[4.]]
>>> resampled_ts = ts.resample(freq="1h", offset=pd.Timedelta("30min"))
>>> downsampled_mean_ts = ts.resample(freq="30min", method="mean")
>>> print(downsampled_mean_ts.values())
[[0.5]
[2.5]
[4.5]]
>>> downsampled_reduce_ts = ts.resample(freq="30min", method="reduce", method_args={"func": np.mean})
>>> print(downsampled_reduce_ts.values())
[[0.5]
[2.5]
[4.5]]
"""
method_kwargs = method_kwargs or {}
if isinstance(freq, pd.DateOffset):
freq = freq.freqstr
resample = self.data_array(copy=False).resample(
indexer={self._time_dim: freq},
**kwargs,
)
if method in SUPPORTED_RESAMPLE_METHODS:
applied_method = getattr(xr.core.resample.DataArrayResample, method)
new_xa = applied_method(resample, **method_kwargs)
# Convert boolean to int as Timeseries must contain numeric values only
# method: "all", "any"
if new_xa.dtype == "bool":
new_xa = new_xa.astype(int)
else:
raise_log(ValueError(f"Unknown method: {method}"), logger)
return self.__class__.from_xarray(new_xa)
[docs] def is_within_range(self, ts: Union[pd.Timestamp, int]) -> bool:
"""Whether the given timestamp or integer is within the time interval of the series.
`ts` does not need to be an element of the series' time index.
Parameters
----------
ts
The `pandas.Timestamp` (if indexed with DatetimeIndex) or integer (if indexed with RangeIndex) to check.
Returns
-------
bool
Whether `ts` is contained within the interval of this series.
"""
return self.time_index[0] <= ts <= self.time_index[-1]
[docs] def map(
self,
fn: Union[
Callable[[np.number], np.number],
Callable[[Union[pd.Timestamp, int], np.number], np.number],
],
) -> Self: # noqa: E501
"""Return a new series with the function `fn` applied to the values of this series.
If `fn` takes 1 argument it is simply applied on the values array of shape `(time, n_components, n_samples)`.
If `fn` takes 2 arguments, it is applied repeatedly on the `(ts, value[ts])` tuples, where `ts` denotes a
timestamp value, and `value[ts]` denotes the array of values at this timestamp, of shape
`(n_components, n_samples)`.
Parameters
----------
fn
Either a function which takes a NumPy array and returns a NumPy array of same shape;
e.g., `lambda x: x ** 2`, `lambda x: x / x.shape[0]` or `np.log`.
It can also be a function which takes a timestamp and array, and returns a new array of same shape;
e.g., `lambda ts, x: x / ts.days_in_month`.
The type of `ts` is either `pandas.Timestamp` (if the series is indexed with a DatetimeIndex),
or an integer otherwise (if the series is indexed with an RangeIndex).
Returns
-------
TimeSeries
A new series with the function `fn` applied to the values.
"""
if not isinstance(fn, Callable):
raise_log(TypeError("fn should be callable"), logger)
if isinstance(fn, np.ufunc):
if fn.nin == 1 and fn.nout == 1:
num_args = 1
elif fn.nin == 2 and fn.nout == 1:
num_args = 2
else:
raise_log(
ValueError(
"fn must have either one or two arguments and return a single value"
),
logger,
)
else:
try:
num_args = len(signature(fn).parameters)
except ValueError:
raise_log(
ValueError(
"inspect.signature(fn) failed. Try wrapping fn in a lambda, e.g. lambda x: fn(x)"
),
logger,
)
if num_args == 1: # apply fn on values directly
values = fn(self._values)
elif num_args == 2: # map function uses timestamp f(timestamp, x)
# go over shortest amount of iterations, either over time steps or components and samples
if self.n_timesteps <= self.n_components * self.n_samples:
new_vals = np.vstack([
np.expand_dims(
fn(self.time_index[i], self._values[i, :, :]), axis=0
)
for i in range(self.n_timesteps)
])
else:
new_vals = np.stack(
[
np.column_stack([
fn(self.time_index, self._values[:, i, j])
for j in range(self.n_samples)
])
for i in range(self.n_components)
],
axis=1,
)
values = new_vals
else:
raise_log(ValueError("fn must have either one or two arguments"), logger)
return self.__class__(
times=self._time_index,
values=values,
components=self.components,
**self._attrs,
)
[docs] def to_json(self) -> str:
"""Return a JSON string representation of the deterministic series.
At the moment this function works only on deterministic time series (i.e., made of 1 sample).
Notes
-----
Static covariates are not returned in the JSON string. When using `TimeSeries.from_json()`, the static
covariates can be added with input argument `static_covariates`.
Returns
-------
str
A JSON String representing the series
"""
return self.to_dataframe().to_json(orient="split", date_format="iso")
[docs] def to_csv(self, *args, **kwargs):
"""Write the deterministic series to a CSV file.
For a list of parameters, refer to the documentation of :func:`pandas.DataFrame.to_csv()` [1]_.
References
----------
.. [1] https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html?highlight=to_csv
"""
if not self.is_deterministic:
raise_log(
ValueError(
"Writing to csv is only supported for deterministic time series "
"(a series with only one sample per time and component)."
)
)
self.to_dataframe().to_csv(*args, **kwargs)
[docs] def to_pickle(self, path: str, protocol: int = pickle.HIGHEST_PROTOCOL):
"""Save the series in pickle format.
Parameters
----------
path : string
path to a file where current object will be pickled
protocol : integer, default highest
pickling protocol. The default is best in most cases, use it only if having backward compatibility issues
"""
with open(path, "wb") as fh:
pickle.dump(self, fh, protocol=protocol)
[docs] def plot(
self,
new_plot: bool = False,
central_quantile: Union[float, str] = 0.5,
low_quantile: Optional[float] = 0.05,
high_quantile: Optional[float] = 0.95,
default_formatting: bool = True,
title: Optional[str] = None,
label: Optional[Union[str, Sequence[str]]] = "",
max_nr_components: int = 10,
ax: Optional[matplotlib.axes.Axes] = None,
alpha: Optional[float] = None,
color: Optional[Union[str, tuple, Sequence[str, tuple]]] = None,
c: Optional[Union[str, tuple, Sequence[str, tuple]]] = None,
*args,
**kwargs,
) -> matplotlib.axes.Axes:
"""Plot the series.
This is a wrapper method around :func:`xarray.DataArray.plot()`.
Parameters
----------
new_plot
Whether to spawn a new axis to plot on. See also parameter `ax`.
central_quantile
The quantile (between 0 and 1) to plot as a "central" value, if the series is stochastic (i.e., if
it has multiple samples). This will be applied on each component separately (i.e., to display quantiles
of the components' marginal distributions). For instance, setting `central_quantile=0.5` will plot the
median of each component. `central_quantile` can also be set to 'mean'.
low_quantile
The quantile to use for the lower bound of the plotted confidence interval. Similar to `central_quantile`,
this is applied to each component separately (i.e., displaying marginal distributions). No confidence
interval is shown if `confidence_low_quantile` is None (default 0.05).
high_quantile
The quantile to use for the upper bound of the plotted confidence interval. Similar to `central_quantile`,
this is applied to each component separately (i.e., displaying marginal distributions). No confidence
interval is shown if `high_quantile` is None (default 0.95).
default_formatting
Whether to use the darts default scheme.
title
Optionally, a custom plot title. If `None`, will use the name of the underlying `xarray.DataArray`.
label
Can either be a string or list of strings. If a string and the series only has a single component, it is
used as the label for that component. If a string and the series has multiple components, it is used as
a prefix for each component name. If a list of strings with length equal to the number of components in
the series, the labels will be mapped to the components in order.
max_nr_components
The maximum number of components of a series to plot. -1 means all components will be plotted.
ax
Optionally, an axis to plot on. If `None`, and `new_plot=False`, will use the current axis. If
`new_plot=True`, will create a new axis.
alpha
Optionally, set the line alpha for deterministic series, or the confidence interval alpha for
probabilistic series.
color
Can either be a single color or list of colors. Any matplotlib color is accepted (string, hex string,
RGB/RGBA tuple). If a single color and the series only has a single component, it is used as the color
for that component. If a single color and the series has multiple components, it is used as the color
for each component. If a list of colors with length equal to the number of components in the series, the
colors will be mapped to the components in order.
c
An alias for `color`.
args
some positional arguments for the `plot()` method
kwargs
some keyword arguments for the `plot()` method
Returns
-------
matplotlib.axes.Axes
Either the passed `ax` axis, a newly created one if `new_plot=True`, or the existing one.
"""
alpha_confidence_intvls = 0.25
if central_quantile != "mean":
if not (
isinstance(central_quantile, float) and 0.0 <= central_quantile <= 1.0
):
raise_log(
ValueError(
'central_quantile must be either "mean", or a float between 0 and 1.'
),
logger,
)
if high_quantile is not None and low_quantile is not None:
if not (0.0 <= low_quantile <= 1.0 and 0.0 <= high_quantile <= 1.0):
raise_log(
ValueError(
"confidence interval low and high quantiles must be between 0 and 1.",
),
logger,
)
if max_nr_components == -1:
n_components_to_plot = self.n_components
else:
n_components_to_plot = min(self.n_components, max_nr_components)
if self.n_components > n_components_to_plot:
logger.warning(
f"Number of series components ({self.n_components}) is larger than the maximum number of "
f"components to plot ({max_nr_components}). Plotting only the first `{max_nr_components}` "
f"components. You can adjust the number of components to plot using `max_nr_components`."
)
if not isinstance(label, str) and isinstance(label, Sequence):
if len(label) != self.n_components and len(label) != n_components_to_plot:
raise_log(
ValueError(
f"The `label` sequence must have the same length as the number of series components "
f"({self.n_components}) or as the number of plotted components ({n_components_to_plot}). "
f"Received length `{len(label)}`."
),
logger,
)
custom_labels = True
else:
custom_labels = False
if color and c:
raise_log(
ValueError(
"`color` and `c` must not be used simultaneously, use one or the other."
),
logger,
)
color = color or c
if not isinstance(color, (str, tuple)) and isinstance(color, Sequence):
if len(color) != self.n_components and len(color) != n_components_to_plot:
raise_log(
ValueError(
f"The `color` sequence must have the same length as the number of series components "
f"({self.n_components}) or as the number of plotted components ({n_components_to_plot}). "
f"Received length `{len(label)}`."
),
logger,
)
custom_colors = True
else:
custom_colors = False
kwargs["alpha"] = alpha
if not any(lw in kwargs for lw in ["lw", "linewidth"]):
kwargs["lw"] = 2
if new_plot:
fig, ax = plt.subplots()
else:
if ax is None:
ax = plt.gca()
# TODO: migrate from xarray plotting to something else
data_array = self.data_array(copy=False)
for i, c in enumerate(data_array.component[:n_components_to_plot]):
comp_name = str(c.values)
comp = data_array.sel(component=c)
if comp.sample.size > 1:
if central_quantile == "mean":
central_series = comp.mean(dim=DIMS[2])
else:
central_series = comp.quantile(q=central_quantile, dim=DIMS[2])
else:
central_series = comp.mean(dim=DIMS[2])
if custom_labels:
label_to_use = label[i]
else:
if label == "":
label_to_use = comp_name
elif len(self.components) == 1:
label_to_use = label
else:
label_to_use = f"{label}_{comp_name}"
kwargs["label"] = label_to_use
kwargs["c"] = color[i] if custom_colors else color
kwargs_central = deepcopy(kwargs)
if not self.is_deterministic:
kwargs_central["alpha"] = 1
if central_series.shape[0] > 1:
p = central_series.plot(*args, ax=ax, **kwargs_central)
# empty TimeSeries
elif central_series.shape[0] == 0:
p = ax.plot(
[],
[],
*args,
**kwargs_central,
)
else:
p = ax.plot(
[self.start_time()],
central_series.values[0],
"o",
*args,
**kwargs_central,
)
ax.set_xlabel(self.time_dim)
color_used = p[0].get_color() if default_formatting else None
# Optionally show confidence intervals
if (
comp.sample.size > 1
and low_quantile is not None
and high_quantile is not None
):
low_series = comp.quantile(q=low_quantile, dim=DIMS[2])
high_series = comp.quantile(q=high_quantile, dim=DIMS[2])
if low_series.shape[0] > 1:
ax.fill_between(
self.time_index,
low_series,
high_series,
color=color_used,
alpha=(alpha if alpha is not None else alpha_confidence_intvls),
)
else:
ax.plot(
[self.start_time(), self.start_time()],
[low_series.values[0], high_series.values[0]],
"-+",
color=color_used,
lw=2,
)
ax.legend()
ax.set_title(title if title is not None else data_array.name)
return ax
[docs] def with_columns_renamed(
self, col_names: Union[list[str], str], col_names_new: Union[list[str], str]
) -> Self:
"""Return a new series with new columns/components names.
It also adapts the names in the hierarchy, if any.
Parameters
-------
col_names
String or list of strings corresponding the the column names to be changed.
col_names_new
String or list of strings corresponding to the new column names. Must be the same length as col_names.
Returns
-------
TimeSeries
A new series with renamed columns.
"""
if isinstance(col_names, str):
col_names = [col_names]
if isinstance(col_names_new, str):
col_names_new = [col_names_new]
if not all([(x in self.components.to_list()) for x in col_names]):
raise_log(
ValueError(
"Some column names in col_names don't exist in the time series."
),
logger,
)
if len(col_names) != len(col_names_new):
raise_log(
ValueError(
"Length of col_names_new list should be equal to the length of col_names list."
),
logger,
)
old2new = {old: new for (old, new) in zip(col_names, col_names_new)}
# update component names
cols = [old2new[old] if old in old2new else old for old in self.components]
# update hierarchy names
if self.hierarchy is not None:
hierarchy = {
(old2new[key] if key in old2new else key): [
old2new[old] if old in old2new else old
for old in self.hierarchy[key]
]
for key in self.hierarchy
}
else:
hierarchy = None
return self.__class__(
times=self._time_index,
values=self._values,
components=pd.Index(cols),
static_covariates=self.static_covariates,
hierarchy=hierarchy,
metadata=self.metadata,
)
"""
Simple statistic and aggregation functions. Calculate various statistics over the samples of stochastic time series
or aggregate over components/time for deterministic series.
"""
[docs] def mean(self, axis: int = 2) -> Self:
"""Return a new series with the mean computed over the specified `axis`.
If we reduce over time (``axis=0``), the series will have length one and will use the first entry of the
original ``time_index``. If we perform the calculation over the components (``axis=1``), the resulting single
component will be renamed to "components_mean". When applied to the samples (``axis=2``), a deterministic
series is returned.
If ``axis=1``, the static covariates and the hierarchy are discarded from the series.
Parameters
----------
axis
The axis to reduce over. The default is to calculate over samples, i.e. axis=2.
Returns
-------
TimeSeries
A new series with mean applied to the indicated axis.
"""
values = self._values.mean(axis=axis, keepdims=True)
times, components = self._get_agg_dims("components_mean", axis)
return self.__class__(
times=times,
values=values,
components=components,
**(self._attrs if axis != 1 else dict()),
)
[docs] def sum(self, axis: int = 2) -> Self:
"""Return a new series with the sum computed over the specified `axis`.
If we reduce over time (``axis=0``), the series will have length one and will use the first entry of the
original ``time_index``. If we perform the calculation over the components (``axis=1``), the resulting single
component will be renamed to "components_sum". When applied to the samples (``axis=2``), a deterministic
series is returned.
If ``axis=1``, the static covariates and the hierarchy are discarded from the series.
Parameters
----------
axis
The axis to reduce over. The default is to calculate over samples, i.e. axis=2.
Returns
-------
TimeSeries
A new series with sum applied to the indicated axis.
"""
values = self._values.sum(axis=axis, keepdims=True)
times, components = self._get_agg_dims("components_sum", axis)
return self.__class__(
times=times,
values=values,
components=components,
**(self._attrs if axis != 1 else dict()),
)
[docs] def min(self, axis: int = 2) -> Self:
"""Return a new series with the minimum computed over the specified `axis`.
If we reduce over time (``axis=0``), the series will have length one and will use the first entry of the
original ``time_index``. If we perform the calculation over the components (``axis=1``), the resulting single
component will be renamed to "components_min". When applied to the samples (``axis=2``), a deterministic
series is returned.
If ``axis=1``, the static covariates and the hierarchy are discarded from the series.
Parameters
----------
axis
The axis to reduce over. The default is to calculate over samples, i.e. axis=2.
Returns
-------
TimeSeries
A new series with min applied to the indicated axis.
"""
values = self._values.min(axis=axis, keepdims=True)
times, components = self._get_agg_dims("components_min", axis)
return self.__class__(
times=times,
values=values,
components=components,
**(self._attrs if axis != 1 else dict()),
)
[docs] def max(self, axis: int = 2) -> Self:
"""Return a new series with the maximum computed over the specified `axis`.
If we reduce over time (``axis=0``), the series will have length one and will use the first entry of the
original ``time_index``. If we perform the calculation over the components (``axis=1``), the resulting single
component will be renamed to "components_max". When applied to the samples (``axis=2``), a deterministic
series is returned.
If ``axis=1``, the static covariates and the hierarchy are discarded from the series.
Parameters
----------
axis
The axis to reduce over. The default is to calculate over samples, i.e. axis=2.
Returns
-------
TimeSeries
A new series with max applied to the indicated axis.
"""
values = self._values.max(axis=axis, keepdims=True)
times, components = self._get_agg_dims("components_max", axis)
return self.__class__(
times=times,
values=values,
components=components,
**(self._attrs if axis != 1 else dict()),
)
[docs] def quantile(self, q: Union[float, Sequence[float]] = 0.5, **kwargs) -> Self:
"""Return a deterministic series with the desired quantile(s) `q` of each component computed over the samples
of the stochastic series.
The component quantiles in the new series are named "<component>_q<quantile>", where "<component>" is the
column name, and "<quantile>" is the quantile value.
The order of the component quantiles is: `[<c_1>_q<q_1>, ... <c_1>_q<q_2>, ..., <c_n>_q<q_n>]`.
This works only on stochastic series (i.e., with more than 1 sample).
Parameters
----------
q
The desired quantile value or sequence of quantile values. Each value must be between 0. and 1. inclusive.
For instance, `0.5` will return a TimeSeries containing the median of the (marginal) distribution of each
component.
kwargs
Other keyword arguments are passed down to `numpy.quantile()`.
Returns
-------
TimeSeries
A new series containing the desired quantile(s) of each component.
"""
self._assert_stochastic()
if isinstance(q, float):
q = [q]
if not all([0 <= q_i <= 1 for q_i in q]):
raise_log(
ValueError(
"The quantile values must be expressed as fraction (between 0 and 1 inclusive)."
),
logger,
)
# component names
cnames = [f"{comp}_q{q_i:.2f}" for comp in self.components for q_i in q]
# get quantiles of shape (n quantiles, n times, n components)
new_data = np.quantile(self._values, q=q, axis=2, **kwargs)
# transpose and reshape into (n times, n components * n quantiles, 1)
new_data = new_data.transpose((1, 2, 0)).reshape(len(self), len(cnames), 1)
# only add static covariates and hierarchy if the number of output components matches the input components
return self.__class__(
times=self._time_index,
values=new_data,
components=cnames,
static_covariates=self.static_covariates if len(q) == 1 else None,
hierarchy=self.hierarchy if len(q) == 1 else None,
metadata=self.metadata,
copy=False,
)
[docs] def var(self, ddof: int = 1) -> Self:
"""Return a deterministic series with the variance of each component computed over the samples of the
stochastic series.
This works only on stochastic series (i.e., with more than 1 sample)
Parameters
----------
ddof
"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof where N represents the
number of elements. By default, ddof is 1.
Returns
-------
TimeSeries
A new series containing the variance of each component.
"""
self._assert_stochastic()
vals = self._values.var(axis=2, ddof=ddof, keepdims=True)
return self.with_values(vals)
[docs] def std(self, ddof: int = 1) -> Self:
"""Return a deterministic series with the standard deviation of each component computed over the samples of the
stochastic series.
This works only on stochastic series (i.e., with more than 1 sample)
Parameters
----------
ddof
"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof where N represents the
number of elements. By default, ddof is 1.
Returns
-------
TimeSeries
A new series containing the standard deviation of each component.
"""
self._assert_stochastic()
vals = self._values.std(axis=2, ddof=ddof, keepdims=True)
return self.with_values(vals)
[docs] def skew(self, **kwargs) -> Self:
"""Return a deterministic series with the skew of each component computed over the samples of the
stochastic series.
This works only on stochastic series (i.e., with more than 1 sample)
Parameters
----------
kwargs
Other keyword arguments are passed down to `scipy.stats.skew()`
Returns
-------
TimeSeries
A new series containing the skew of each component.
"""
self._assert_stochastic()
vals = np.expand_dims(skew(self._values, axis=2, **kwargs), axis=2)
return self.with_values(vals)
[docs] def kurtosis(self, **kwargs) -> Self:
"""Return a deterministic series with the kurtosis of each component computed over the samples of the
stochastic series.
This works only on stochastic series (i.e., with more than 1 sample)
Parameters
----------
kwargs
Other keyword arguments are passed down to `scipy.stats.kurtosis()`
Returns
-------
TimeSeries
A new series containing the kurtosis of each component.
"""
self._assert_stochastic()
vals = np.expand_dims(kurtosis(self._values, axis=2, **kwargs), axis=2)
return self.with_values(vals)
"""
Dunder methods
"""
def _extract_values(
self,
other: Union[Self, xr.DataArray, np.ndarray],
) -> Self:
"""Extract values from another series or array and check for compatible shapes."""
if isinstance(other, TimeSeries):
other_vals = other._values
elif isinstance(other, xr.DataArray):
other_vals = other.values
else:
other_vals = other
t, c, s = self.shape
other_shape = other_vals.shape
if not (
# can combine arrays if shapes are equal (t, c, s)
other_shape == (t, c, s)
# or broadcast [t, 1, 1] onto [t, c, s]
or other_shape == (t, 1, 1)
# or broadcast [t, c, 1] onto [t, c, s]
or other_shape == (t, c, 1)
# or broadcast [t, 1, s] onto [t, c, s]
or other_shape == (t, 1, s),
):
raise_log(
ValueError(
"Attempted to perform operation on two TimeSeries of unequal shapes."
),
logger=logger,
)
return other_vals
@classmethod
def _fill_missing_dates(
cls,
times: Union[pd.DatetimeIndex, pd.RangeIndex, pd.Index],
values: np.ndarray,
freq: Optional[Union[str, int]] = None,
) -> tuple[Union[pd.DatetimeIndex, pd.RangeIndex], np.ndarray]:
"""Return the time index and values with missing dates inserted.
This requires either a provided `freq` or the possibility to infer a unique frequency from `times` (see
`offset aliases <https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`_
for more info on supported frequencies).
Parameters
----------
times
The time index.
values
The values.
freq
Optionally, the target frequency to fill in the missing times. A pandas frequency offset (string) if
`times` is a `pandas.DatetimeIndex`, or a step step size if `times` is an integer index.
It must represent a target frequency that allows to maintain all dates / integers from `times`.
Raises
-------
ValueError
If `times` contains less than 3 elements,
if no unique frequency can be inferred from `times`,
if the resampled index does not contain all dates from `times`.
Returns
-------
tuple[Union[pandas.DatetimeIndex, pandas.RangeIndex], numpy.ndarray]
The `times` with inserted missing dates and `values` with `numpy.nan` for the newly inserted dates.
"""
if freq is not None:
return cls._restore_from_frequency(times=times, values=values, freq=freq)
if len(times) <= 2:
raise_log(
ValueError(
"Input time series must be of (length>=3) when fill_missing_dates=True and freq=None."
),
logger,
)
times, values = cls._sort_index(times=times, values=values)
if isinstance(times, pd.DatetimeIndex):
has_datetime_index = True
observed_freqs = cls._observed_freq_datetime_index(times)
else: # integer index (non RangeIndex)
has_datetime_index = False
observed_freqs = cls._observed_freq_integer_index(times)
if not len(observed_freqs) == 1:
offset_alias_info = (
(
" For more information about frequency aliases, read "
"https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases"
)
if has_datetime_index
else ""
)
raise_log(
ValueError(
f"Could not observe an inferred frequency. An explicit frequency must be evident over a span of "
f"at least 3 consecutive time stamps in the input data. {offset_alias_info}"
if not len(observed_freqs)
else f"Could not find a unique inferred frequency (not constant). Observed frequencies: "
f"{observed_freqs}. If any of those is the actual frequency, try passing it with "
f"`fill_missing_dates=True` and `freq=your_frequency`.{offset_alias_info}"
),
logger,
)
freq = observed_freqs.pop()
return cls._restore_from_frequency(times=times, values=values, freq=freq)
@staticmethod
def _sort_index(
times: Union[pd.DatetimeIndex, pd.RangeIndex, pd.Index],
values: np.ndarray,
) -> tuple[Union[pd.DatetimeIndex, pd.RangeIndex], np.ndarray]:
"""Sort `times` and `values` by ascending dates.
Only performed if `times` is not already monotonically increasing.
"""
if times.is_monotonic_increasing:
return times, values
times, idx_sorted = times.sort_values(return_indexer=True)
return times, values[idx_sorted]
@staticmethod
def _observed_freq_datetime_index(index: pd.DatetimeIndex) -> set:
"""Return all observed/inferred frequencies of a `pandas.DatetimeIndex`.
The frequencies are inferred from all combinations of three consecutive time steps.
Assumes that `index` is sorted in ascending order.
"""
# find unique time deltas indices from three consecutive time stamps
_, unq_td_index = np.unique(
np.stack([(index[1:-1] - index[:-2]), (index[2:] - index[1:-1])]),
return_index=True,
axis=1,
)
# for each unique index, take one example including the left time stamp, and one including the right
steps = np.column_stack([index[unq_td_index + i] for i in range(3)])
# find all unique inferred frequencies
observed_freqs = {pd.infer_freq(step) for step in steps}
observed_freqs.discard(None)
return observed_freqs
@staticmethod
def _observed_freq_integer_index(index: pd.Index) -> set:
"""Return all observed/inferred frequencies of a ``pandas.Index`` (an integer-valued index).
The inferred frequencies are given by all unique differences between two consecutive elements.
Assumes that `index` is sorted in ascending order.
"""
return set(index[1:] - index[:-1])
@classmethod
def _restore_range_indexed(
cls,
times: pd.Index,
values: np.ndarray,
) -> tuple[Union[pd.DatetimeIndex, pd.RangeIndex], np.ndarray]:
"""Return `times` re-indexed into a `pandas.RangeIndex` and `values` in the re-indexed order.
An integer `pandas.Index` can be converted to a `pandas.RangeIndex`, if the sorted index has a constant step
size. Raises a `ValueError` otherwise.
"""
times, values = cls._sort_index(times=times, values=values)
observed_freqs = cls._observed_freq_integer_index(times)
if len(observed_freqs) != 1:
raise_log(
ValueError(
f"Could not convert integer index to a `pandas.RangeIndex`. Found non-unique step "
f"sizes/frequencies: `{observed_freqs}`. If any of those is the actual frequency, "
f"try passing it with `fill_missing_dates=True` and `freq=your_frequency`."
),
logger=logger,
)
freq = observed_freqs.pop()
times = pd.RangeIndex(
start=min(times),
stop=max(times) + freq,
step=freq,
name=times.name,
)
return times, values
@classmethod
def _restore_from_frequency(
cls,
times: Union[pd.DatetimeIndex, pd.RangeIndex, pd.Index],
values: np.ndarray,
freq: Union[str, int],
) -> tuple[Union[pd.DatetimeIndex, pd.RangeIndex], np.ndarray]:
"""Return `times` resampled with frequency `freq` and values with `np.nan` for the newly inserted dates.
The frequency `freq` must represent a target frequency that allows to maintain all dates from `times`.
Parameters
----------
times
The time index.
values
The values.
freq
The target frequency to fill in the missing times. A pandas frequency offset (string) if
`times` is a `pandas.DatetimeIndex`, or a step size if `times` is an integer index.
It must represent a target frequency that allows to maintain all dates / integers from `times`.
Raises
-------
ValueError
If the resampled/re-indexed DateTimeIndex/RangeIndex does not contain all dates from `times`.
Returns
-------
tuple[Union[pandas.DatetimeIndex, pandas.RangeIndex], numpy.ndarray]
The resampled `times` with frequency `freq` and `values` with `numpy.nan` for the newly inserted dates.
"""
times, values = cls._sort_index(times=times, values=values)
resampled_times = pd.Series(index=times, dtype="object")
if isinstance(times, pd.DatetimeIndex):
has_datetime_index = True
resampled_times = resampled_times.asfreq(freq)
else: # integer index (non RangeIndex) -> resampled to RangeIndex
has_datetime_index = False
resampled_times = resampled_times.reindex(
range(min(times), max(times) + freq, freq)
)
# check if new time index with inferred frequency contains all input data
contains_all_data = times.isin(resampled_times.index).all()
if not contains_all_data:
offset_alias_info = (
(
" For more information about frequency aliases, read "
"https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases"
)
if has_datetime_index
else ""
)
raise_log(
ValueError(
f"Could not correctly fill missing {'dates' if has_datetime_index else 'indices'} with the "
f"observed/passed {'frequency' if has_datetime_index else 'step size'} `freq='{freq}'`. "
f"Not all input {'time stamps' if has_datetime_index else 'indices'} contained in the newly "
f"created TimeSeries.{offset_alias_info}"
),
logger,
)
# convert to float as for instance integer arrays cannot accept nans
dtype = (
values.dtype
if (
np.issubdtype(values.dtype, np.float32)
or np.issubdtype(values.dtype, np.float64)
)
else np.float64
)
resampled_values = np.empty(
shape=((len(resampled_times),) + values.shape[1:]), dtype=dtype
)
resampled_values[:] = np.nan
resampled_values[resampled_times.index.isin(times)] = values
return resampled_times.index, resampled_values
@staticmethod
def _get_axis(axis: Union[int, str]) -> int:
"""Convert different `axis` types to an integer axis."""
if isinstance(axis, int):
if not 0 <= axis <= 2:
raise_log(
ValueError("If `axis` is an integer it must be between 0 and 2."),
logger,
)
return axis
else:
if axis not in DIMS:
raise_log(
ValueError(
f"`axis` must be a known dimension of this series: {DIMS}"
),
logger,
)
return DIMS.index(axis)
def _get_agg_dims(
self, new_cname: str, axis: int
) -> tuple[Union[pd.DatetimeIndex, pd.RangeIndex], pd.Index]:
"""Get output time index and components based on a aggregation `axis` and potential new column name
`new_cname`.
"""
if axis == 0: # set time_index to first day
return self._time_index[0:1], self.components
elif axis == 1: # rename components
return self._time_index, pd.Index([new_cname])
elif axis == 2: # do nothing
return self._time_index, self.components
else:
raise_log(
ValueError(f"Invalid `axis={axis}`. Must be one of `(1, 2, 3)`."),
logger,
)
def _get_first_timestamp_after(self, ts: pd.Timestamp) -> Union[pd.Timestamp, int]:
return next(filter(lambda t: t >= ts, self._time_index))
def _get_last_timestamp_before(self, ts: pd.Timestamp) -> Union[pd.Timestamp, int]:
return next(filter(lambda t: t <= ts, self._time_index[::-1]))
def _assert_univariate(self):
if not self.is_univariate:
raise_log(
AssertionError(
"Only univariate TimeSeries instances support this method"
),
logger,
)
def _assert_deterministic(self):
if not self.is_deterministic:
raise_log(
AssertionError(
"Only deterministic TimeSeries (with 1 sample) instances support this method"
),
logger,
)
def _assert_stochastic(self):
if not self.is_stochastic:
raise_log(
AssertionError(
"Only non-deterministic TimeSeries (with more than 1 samples) "
"instances support this method"
),
logger,
)
def _raise_if_not_within(self, ts: Union[pd.Timestamp, int]):
if isinstance(ts, pd.Timestamp):
# Not that the converse doesn't apply (a time-indexed series can be called with an integer)
if not self._has_datetime_index:
raise_log(
ValueError(
"Function called with a timestamp, but series not time-indexed."
),
logger,
)
is_inside = self.start_time() <= ts <= self.end_time()
else:
if self._has_datetime_index:
is_inside = 0 <= ts <= len(self)
else:
is_inside = self.start_time() <= ts <= self.end_time()
if not is_inside:
raise_log(
ValueError(
f"Timestamp must be between {self.start_time()} and {self.end_time()}"
),
logger,
)
def __eq__(self, other):
if not isinstance(other, TimeSeries):
return False
if self.shape != other.shape:
return False
if not self._time_index.equals(other._time_index):
return False
if not np.array_equal(self._values, other._values, equal_nan=True):
return False
if not self.components.equals(other.components):
return False
sc_self, sc_other = self.static_covariates, other.static_covariates
if (sc_self is not None) != (sc_other is not None):
return False
elif isinstance(sc_self, pd.DataFrame) and not sc_self.equals(sc_other):
return False
hr_self, hr_other = self.hierarchy, other.hierarchy
if (hr_self is not None) != (hr_other is not None):
return False
elif isinstance(hr_self, dict) and hr_self != hr_other:
return False
md_self, md_other = self.metadata, other.metadata
if (md_self is not None) != (md_other is not None):
return False
elif isinstance(md_self, dict) and md_self != md_other:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __len__(self):
return len(self._values)
def __add__(self, other):
if isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
elif not isinstance(other, (int, float, np.integer)):
raise_log(
TypeError(
f"unsupported operand type(s) for + or add(): '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
ts = self.copy()
np.add(ts._values, other, out=ts._values)
return ts
def __radd__(self, other):
return self + other
def __sub__(self, other):
if isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
elif not isinstance(other, (int, float, np.integer)):
raise_log(
TypeError(
f"unsupported operand type(s) for - or sub(): '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
ts = self.copy()
np.subtract(ts._values, other, out=ts._values)
return ts
def __rsub__(self, other):
return other + (-self)
def __mul__(self, other):
if isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
elif not isinstance(other, (int, float, np.integer)):
raise_log(
TypeError(
f"unsupported operand type(s) for * or mul(): '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
ts = self.copy()
np.multiply(ts._values, other, out=ts._values)
return ts
def __rmul__(self, other):
return self * other
def __pow__(self, n):
if isinstance(n, (int, float, np.integer)):
if n < 0:
raise_log(
ValueError("Attempted to raise a series to a negative power."),
logger,
)
n = float(n)
elif isinstance(n, (TimeSeries, xr.DataArray, np.ndarray)):
n = self._extract_values(n) # elementwise power
else:
raise_log(
TypeError(
f"unsupported operand type(s) for ** or pow(): '{type(self).__name__}' and '{type(n).__name__}'."
),
logger,
)
ts = self.copy()
np.power(ts._values, n, out=ts._values)
return ts
def __truediv__(self, other):
if isinstance(other, (int, float, np.integer)):
if other == 0:
raise_log(ZeroDivisionError("Cannot divide by 0."), logger)
elif isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
if (other == 0).any():
raise_log(
ZeroDivisionError("Cannot divide by a TimeSeries with a value 0."),
logger,
)
else:
raise_log(
TypeError(
"unsupported operand type(s) for / or truediv():"
f" '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
ts = self.copy()
np.divide(ts._values, other, out=ts._values)
return ts
def __rtruediv__(self, n):
return n * (self ** (-1))
def __abs__(self):
ts = self.copy()
np.absolute(ts._values, out=ts._values)
return ts
def __neg__(self):
ts = self.copy()
np.negative(ts._values, out=ts._values)
return ts
def __contains__(self, ts: Union[int, pd.Timestamp]) -> bool:
return ts in self.time_index
def __round__(self, n=None):
ts = self.copy()
np.round(ts._values, n, out=ts._values)
return ts
def __lt__(self, other) -> np.ndarray:
if isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
elif not isinstance(other, (int, float, np.integer)):
raise_log(
TypeError(
f"unsupported operand type(s) for < : '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
return np.less(self._values, other)
def __gt__(self, other) -> np.ndarray:
if isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
elif not isinstance(other, (int, float, np.integer)):
raise_log(
TypeError(
f"unsupported operand type(s) for > : '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
return np.greater(self._values, other)
def __le__(self, other) -> np.ndarray:
if isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
elif not isinstance(other, (int, float, np.integer)):
raise_log(
TypeError(
f"unsupported operand type(s) for <= : '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
return np.less_equal(self._values, other)
def __ge__(self, other) -> np.ndarray:
if isinstance(other, (TimeSeries, xr.DataArray, np.ndarray)):
other = self._extract_values(other)
elif not isinstance(other, (int, float, np.integer)):
raise_log(
TypeError(
f"unsupported operand type(s) for >= : '{type(self).__name__}' and '{type(other).__name__}'."
),
logger,
)
return np.greater_equal(self._values, other)
def __str__(self):
return str(self.data_array(copy=False)).replace(
"xarray.DataArray", "TimeSeries (DataArray)"
)
def __repr__(self):
return (
self.data_array(copy=False)
.__repr__()
.replace("xarray.DataArray", "TimeSeries")
)
def _repr_html_(self):
return (
self.data_array(copy=False)
._repr_html_()
.replace("xarray.DataArray", "TimeSeries")
)
def __copy__(self, deep: bool = True):
return self.copy()
def __deepcopy__(self, memo):
return self.__class__(
times=deepcopy(self._time_index, memo),
values=deepcopy(self._values, memo),
components=deepcopy(self.components, memo),
copy=False,
**deepcopy(self._attrs, memo),
)
def __getitem__(
self,
key: Union[
pd.DatetimeIndex,
pd.RangeIndex,
list[str],
list[int],
list[pd.Timestamp],
str,
int,
pd.Timestamp,
Any,
],
) -> Self:
"""Return a new series with elements selected by `key`.
The supported index types are the following base types as a single value, a list or a slice:
- pandas.Timestamp -> return a TimeSeries corresponding to the value(s) at the given timestamp(s).
- str -> return a TimeSeries including the column(s) (components) specified as str.
- int -> return a TimeSeries with the value(s) at the given row (time) index.
`pandas.DatetimeIndex` and `pandas.RangeIndex` are also supported and will return the corresponding value(s)
at the provided time indices.
.. warning::
slices use pandas convention of including both ends of the slice.
Notes
-----
For integer-indexed series; passing integers or slices of integers act as positional indices. For example,
passing `series[i]` will return the ``i``-th value along the series, which is not necessarily the value where
the time index is equal to ``i`` (if the time index does not start at 0 and / or has a step size > 1). In
contrast, calling this method with a ``pandas.RangeIndex`` returns the values where the time index matches the
provided range index.
"""
def _check_dt():
if not self._has_datetime_index:
raise_log(
ValueError(
"Attempted indexing a series with a DatetimeIndex or a timestamp, "
"but the series uses a RangeIndex."
),
logger,
)
def _check_range():
if self._has_datetime_index:
raise_log(
ValueError(
"Attempted indexing a series with a RangeIndex, "
"but the series uses a DatetimeIndex."
),
logger,
)
adapt_covs_on_component = (
True
if self.has_static_covariates and len(self.static_covariates) > 1
else False
)
# handle DatetimeIndex and RangeIndex:
if isinstance(key, (pd.DatetimeIndex, pd.RangeIndex)):
is_dti = isinstance(key, pd.DatetimeIndex)
_check_dt() if is_dti else _check_range()
times = self._time_index
if len(key) == 0:
# keep original frequency in case of empty index
times = self._time_index[:0]
values = self._values[:0]
else:
idx = times.get_indexer(key)
if (idx < 0).any():
raise_log(KeyError("Not all indices found in time index."), logger)
times = self._time_index[idx]
values = self._values[idx]
# make sure the frequency is transferred
if is_dti:
times.freq = key.freq
else:
# `get_indexer()` converts `RangeIndex` into regular `Index`
times = pd.RangeIndex(
start=times[0], stop=times[-1] + key.step, step=key.step
)
return self.__class__(
times=times, values=values, components=self.components, **self._attrs
)
# handle slices:
elif isinstance(key, slice):
if key.start is None and key.stop is None:
if key.step is not None and key.step <= 0:
raise_log(
ValueError(
"Indexing a `TimeSeries` with a `slice` of `step<=0` (reverse) is not "
"possible since `TimeSeries` must have a monotonically increasing time index."
),
logger=logger,
)
else:
return self.__class__(
times=self._time_index[key],
values=self._values[key],
components=self.components,
**self._attrs,
)
elif isinstance(key.start, str) or isinstance(key.stop, str):
# selecting components discards the hierarchy, if any
idx = self.components.get_indexer(pd.Index([key.start, key.stop]))
if (idx < 0).any():
raise_log(
KeyError("Not all components found in time index."), logger
)
indexer = slice(idx[0], idx[-1] + 1)
values = self._values[:, indexer]
components = self.components[indexer]
static_covariates = self.static_covariates
return self.__class__(
times=self._time_index,
values=values,
components=components,
static_covariates=(
static_covariates[indexer]
if adapt_covs_on_component
else static_covariates
),
hierarchy=None,
metadata=self.metadata,
)
elif isinstance(key.start, (int, np.int64)) or isinstance(
key.stop, (int, np.int64)
):
return self.__class__(
times=self._time_index[key],
values=self._values[key],
components=self.components,
**self._attrs,
)
elif isinstance(key.start, pd.Timestamp) or isinstance(
key.stop, pd.Timestamp
):
if key.step is not None and key.step <= 0:
raise_log(
ValueError(
"Indexing a `TimeSeries` with a `slice` of `step<=0` (reverse) is not "
"possible since `TimeSeries` must have a monotonically increasing time index."
),
logger=logger,
)
_check_dt()
start_time = self.start_time()
if key.start is not None:
start = n_steps_between(
end=key.start, start=start_time, freq=self.freq
)
if start < 0:
# shift start a round-multip of `step` ahead until it lies within the index
start = 0 if key.step is None else start % key.step
else:
start = 0
if key.stop is not None:
end = n_steps_between(
end=key.stop, start=start_time, freq=self.freq
)
else:
end = len(self) - 1
key = slice(start, end + 1, key.step)
return self.__class__(
times=self._time_index[key],
values=self._values[key],
components=self.components,
**self._attrs,
)
# handle simple types:
elif isinstance(key, str):
col_idx = self.components.get_loc(key)
static_covariates = self.static_covariates
return self.__class__(
times=self._time_index,
values=self._values[:, col_idx : col_idx + 1],
components=self.components[col_idx : col_idx + 1],
static_covariates=(
static_covariates.loc[[key]]
if adapt_covs_on_component
else static_covariates
),
hierarchy=None,
metadata=self.metadata,
)
elif isinstance(key, (int, np.int64)):
key = slice(key, key + 1 if key != -1 else None)
ts = self.__class__(
times=self._time_index[key],
values=self._values[key],
components=self.components,
**self._attrs,
)
if len(ts) == 0:
raise_log(IndexError("Integer index out of range."), logger)
return ts
elif isinstance(key, pd.Timestamp):
_check_dt()
key = self._time_index.get_loc(key)
key = slice(key, key + 1)
return self.__class__(
times=self._time_index[key],
values=self._values[key],
components=self.components,
**self._attrs,
)
# handle lists:
if isinstance(key, list):
if all(isinstance(s, str) for s in key):
# when string(s) are provided, we consider it as (a list of) component(s)
indexer = self.components.get_indexer(key)
if (indexer < 0).any():
raise_log(
KeyError("Not all components found in time index."), logger
)
values = self._values[:, indexer]
components = self.components[indexer]
static_covariates = self.static_covariates
return self.__class__(
times=self._time_index,
values=values,
components=components,
static_covariates=(
static_covariates.iloc[indexer]
if adapt_covs_on_component
else static_covariates
),
hierarchy=None,
metadata=self.metadata,
)
elif all(isinstance(i, (int, np.int64)) for i in key):
return self.__class__(
times=self._time_index[key],
values=self._values[key],
components=self.components,
**self._attrs,
)
elif all(isinstance(t, pd.Timestamp) for t in key):
_check_dt()
key = self._time_index.get_indexer(key)
return self.__class__(
times=self._time_index[key],
values=self._values[key],
components=self.components,
**self._attrs,
)
raise_log(IndexError("The type of your index was not matched."), logger)
def _concat_static_covs(series: Sequence[TimeSeries]) -> Optional[pd.DataFrame]:
"""Concatenate static covariates along the component axis (rows of static covariates). Use this for stacking or
concatenating time series along component dimension (axis=1).
Some context for stacking or concatenating two or more TimeSeries with static covariates:
- Concat along axis=0 (time): Along the time dimension, we only take the static covariates of the first series (as
static covariates are time-independent).
- Concat along axis=1 (components) or stacking: Along the component dimension, we either concatenate or transfer
the static covariates of the series if one of below cases applies:
1) concatenate along component dimension (rows of static covariates) when for each series the number of static
covariate components is equal to the number of components in the series. The static variable names (columns in
series.static_covariates) must be identical across all series
2) if only the first series contains static covariates transfer only those
3) if `ignore_static_covariates=True` (with `concatenate()`), case 1) is ignored and only the static covariates
of the first series are transferred
- Concat along axis=2 (samples): Along the sample dimension, we only take the static covariates of the first series
(as the components and time don't change).
"""
if not any([ts.has_static_covariates for ts in series]):
return None
only_first = series[0].has_static_covariates and not any([
ts.has_static_covariates for ts in series[1:]
])
all_have = all([ts.has_static_covariates for ts in series])
if not (only_first or all_have):
raise_log(
ValueError(
"Either none, only the first or all TimeSeries must have `static_covariates`."
),
logger,
)
if only_first:
return series[0].static_covariates
if not (
all([len(ts.static_covariates) == ts.n_components for ts in series])
and all([
ts.static_covariates.columns.equals(series[0].static_covariates.columns)
for ts in series
])
):
raise_log(
ValueError(
"Concatenation of multiple TimeSeries with static covariates requires all `static_covariates` "
"DataFrames to have identical columns (static variable names), and the number of each TimeSeries' "
"components must match the number of corresponding static covariate components (the number of rows "
"in `series.static_covariates`)."
),
logger,
)
return pd.concat(
[ts.static_covariates for ts in series if ts.has_static_covariates], axis=0
)
def _concat_hierarchy(series: Sequence[TimeSeries]):
"""Concatenate the hierarchies of multiple series, when concatenating series along axis 1 (components). This simply
merges the hierarchy dictionaries.
"""
concat_hierarchy = dict()
for s in series:
if s.has_hierarchy:
concat_hierarchy.update(s.hierarchy)
return None if len(concat_hierarchy) == 0 else concat_hierarchy
[docs]def concatenate(
series: Sequence[TimeSeries],
axis: Union[str, int] = 0,
ignore_time_axis: bool = False,
ignore_static_covariates: bool = False,
drop_hierarchy: bool = True,
drop_metadata: bool = False,
):
"""Concatenate multiple series along a given axis.
``axis`` can be an integer in (0, 1, 2) to denote (time, component, sample) or, alternatively, a string denoting
the corresponding dimension of the underlying ``DataArray``.
Parameters
----------
series : Sequence[TimeSeries]
Sequence of ``TimeSeries`` to concatenate.
axis : Union[str, int]
Axis along which the series will be concatenated.
ignore_time_axis : bool
Allow concatenation even when some series do not have matching time axes.
When done along component or sample dimensions, concatenation will work as long as the series
have the same lengths (in this case the resulting series will have the time axis of the first
provided series). When done along time dimension, concatenation will work even if the time axes
are not contiguous (in this case, the resulting series will have a start time matching the start time
of the first provided series). Default: False.
ignore_static_covariates : bool
Whether to ignore all requirements for static covariate concatenation and only transfer the static covariates
of the first TimeSeries element in `series` to the concatenated TimeSeries. Only effective when `axis=1`.
drop_hierarchy : bool
When `axis=1`, whether to drop hierarchy information. True by default. When False, the hierarchies will be
"concatenated" as well (by merging the hierarchy dictionaries), which may cause issues if the component
names of the resulting series and that of the merged hierarchy do not match.
When `axis=0` or `axis=2`, the hierarchy of the first series is always kept.
drop_metadata : bool
Whether to drop the metadata information of the concatenated series. False by default.
When False, the concatenated series will inherit the metadata from the first TimeSeries element in `series`.
Returns
-------
TimeSeries
The concatenated series.
"""
axis = TimeSeries._get_axis(axis)
vals = [ts.all_values(copy=False) for ts in series]
component_axis_equal = len({ts.shape[COMP_AX] for ts in series}) == 1
sample_axis_equal = len({ts.shape[SMPL_AX] for ts in series}) == 1
times = series[0]._time_index
components = series[0].components
static_covariates = series[0].static_covariates
hierarchy = series[0].hierarchy
metadata = None if drop_metadata else series[0].metadata
vals = np.concatenate(vals, axis=axis)
if axis == 0:
# time
if not (component_axis_equal and sample_axis_equal):
raise_log(
ValueError(
"when concatenating along time dimension, the component and sample dimensions of all "
"provided series must match."
),
logger,
)
# check, if timeseries are consecutive
consecutive_time_axes = True
for i in range(1, len(series)):
if series[i - 1].end_time() + series[0].freq != series[i].start_time():
consecutive_time_axes = False
break
if not consecutive_time_axes:
if not ignore_time_axis:
raise_log(
ValueError(
"When concatenating over time axis, all series need to be contiguous "
"in the time dimension. Use `ignore_time_axis=True` to override "
"this behavior and concatenate the series by extending the time axis "
"of the first series."
),
logger,
)
times = generate_index(
start=series[0].start_time(),
freq=series[0].freq,
length=len(vals),
name=times.name,
)
else:
if ignore_time_axis:
time_axes_ok = len({len(ts) for ts in series}) == 1
else:
time_axes_ok = all([
ts.has_same_time_as(ts_next)
for ts, ts_next in zip(series[0:-1], series[1:])
])
if (
(not time_axes_ok)
or (axis == 1 and not sample_axis_equal)
or (axis == 2 and not component_axis_equal)
):
raise_log(
ValueError(
"When concatenating along component or sample dimensions, all the series must have the same time "
"axes (unless `ignore_time_axis` is True), or time axes of same lengths (if `ignore_time_axis` is "
"True), and all series must have the same number of samples (if concatenating along component "
"dimension), or the same number of components (if concatenating along sample dimension)."
),
logger,
)
if axis == 1:
# When concatenating along component dimension, we have to re-create a component index
# we rely on the factory method of TimeSeries to disambiguate names later on if needed.
components = pd.Index([
c for cl in [ts.components for ts in series] for c in cl
])
static_covariates = (
_concat_static_covs(series)
if not ignore_static_covariates
else static_covariates
)
hierarchy = None if drop_hierarchy else _concat_hierarchy(series)
return series[0].__class__(
times=times,
values=vals,
components=components,
static_covariates=static_covariates,
hierarchy=hierarchy,
metadata=metadata,
)
[docs]def slice_intersect(series: Sequence[TimeSeries]) -> list[TimeSeries]:
"""Return a list of series, where all series have been intersected along the time index.
Parameters
----------
series : Sequence[TimeSeries]
sequence of ``TimeSeries`` to intersect
Returns
-------
Sequence[TimeSeries]
The intersected series.
"""
if not series:
return []
# find global intersection on first series
intersection = series[0]
for series_ in series[1:]:
intersection = intersection.slice_intersect(series_)
# intersect all other series
series_intersected = [intersection]
for series_ in series[1:]:
series_intersected.append(series_.slice_intersect(intersection))
return series_intersected
def _finite_rows_boundaries(
values: np.ndarray, how: str = "all"
) -> tuple[Optional[int], Optional[int]]:
"""Return the indices of the first rows containing finite values starting from the start and the end of the first
dimension of the ndarray.
Parameters
----------
values
1D, 2D or 3D numpy array where the first dimension correspond to entries/rows, and the second to components/
columns
how
Define if the entries containing `NaN` in all the components ('all') or in any of the components ('any')
should be stripped. Default: 'all'
"""
dims = values.shape
if len(dims) > 3:
raise_log(
ValueError(f"Expected 1D to 3D array, received {len(dims)}D array"), logger
)
finite_rows = ~np.isnan(values)
if len(dims) == 3:
finite_rows = finite_rows.all(axis=2)
if len(dims) > 1 and dims[1] > 1:
if how == "any":
finite_rows = finite_rows.all(axis=1)
elif how == "all":
finite_rows = finite_rows.any(axis=1)
else:
raise_log(
ValueError(
f"`how` parameter value not recognized, should be either 'all' or 'any', "
f"received {how}"
)
)
first_finite_row = finite_rows.argmax()
last_finite_row = len(finite_rows) - finite_rows[::-1].argmax() - 1
return first_finite_row, last_finite_row
def _clean_components(components: pd.Index) -> pd.Index:
"""Return a `pandas.Index` with unique string component / column names"""
# convert everything to string if needed
clist = [(col if isinstance(col, str) else str(col)) for col in components]
has_duplicate = len(set(clist)) != len(clist)
while has_duplicate:
# we may have to loop several times (e.g. we could have components ["0", "0_1", "0"] and not
# noticing when renaming the last "0" into "0_1" that "0_1" already exists...)
name_to_occurence = defaultdict(int)
for i in range(len(clist)):
name_to_occurence[clist[i]] += 1
if name_to_occurence[clist[i]] > 1:
clist[i] = clist[i] + f"_{name_to_occurence[clist[i]] - 1}"
has_duplicate = len(set(clist)) != len(clist)
return pd.Index(clist)