Source code for darts.dataprocessing.transformers.midas

"""
Mixed-data sampling (MIDAS) Transformer
---------------------------------------
"""

from typing import Any, Dict, List, Mapping, Optional, Sequence, Union

import numpy as np
import pandas as pd

from darts import TimeSeries
from darts.dataprocessing.transformers import (
    FittableDataTransformer,
    InvertibleDataTransformer,
)
from darts.logging import get_logger, raise_log
from darts.timeseries import _finite_rows_boundaries
from darts.utils.utils import generate_index

logger = get_logger(__name__)


[docs]class MIDAS(FittableDataTransformer, InvertibleDataTransformer): def __init__( self, low_freq: str, strip: bool = True, drop_static_covariates: bool = False, name: str = "MIDAS", n_jobs: int = 1, verbose: bool = False, ): """Mixed-data sampling transformer. A transformer that converts higher frequency time series to lower frequency using mixed-data sampling; see [1]_ for further details. This allows higher frequency covariates to be used whilst forecasting a lower frequency target series. For example, using monthly inputs to forecast a quarterly target. Notes ----- The high input frequency should always relate in the same rate to the low target frequency. For example, there's always three months in quarter. However, the number of days in a month varies per month. In the latter case a MIDAS transformation does not work and the transformer will raise an error. For anchored low frequency, the transformed series must contain at least 2 samples in order to be able to retrieve the original time index. Parameters ---------- low_freq The pd.DateOffset string alias corresponding to the target low frequency [2]_. Passed on to the `rule` parameter of pandas.DataFrame.resample(). strip Whether to remove the NaNs from the start and the end of the transformed series. drop_static_covariates If set to `True`, the statics covariates of the input series won't be transferred to the output. This migth be useful for multivariate series with component-specific static covariates. name A specific name for the scaler n_jobs The number of jobs to run in parallel. Parallel jobs are created only when a ``Sequence[TimeSeries]`` is passed as input to a method, parallelising operations regarding different ``TimeSeries``. Defaults to `1` (sequential). Setting the parameter to `-1` means using all the available processors. Note: for a small amount of data, the parallelisation overhead could end up increasing the total required amount of time. verbose Optionally, whether to print operations progress Examples -------- >>> from darts.datasets import AirPassengersDataset >>> from darts.dataprocessing.transformers import MIDAS >>> monthly_series = AirPassengersDataset().load() >>> print(monthly_series.time_index[:4]) DatetimeIndex(['1949-01-01', '1949-02-01', '1949-03-01', '1949-04-01'], dtype='datetime64[ns]', name='Month', freq='MS') >>> print(monthly_series.values()[:4]) [[112.], [118.], [132.], [129.]] >>> midas = MIDAS(low_freq="QS") >>> quarterly_series = midas.fit_transform(monthly_series) >>> print(quarterly_series.time_index[:3]) DatetimeIndex(['1949-01-01', '1949-04-01', '1949-07-01'], dtype='datetime64[ns]', name='Month', freq='QS-JAN') >>> print(quarterly_series.values()[:3]) [[112. 118. 132.], [129. 121. 135.], [148. 148. 136.]] >>> inversed_quaterly = midas.inverse_transform(quarterly_series) >>> print(inversed_quaterly.time_index[:4]) DatetimeIndex(['1949-01-01', '1949-02-01', '1949-03-01', '1949-04-01'], dtype='datetime64[ns]', name='time', freq='MS') >>> print(inversed_quaterly.values()[:4]) [[112.], [118.], [132.], [129.]] References ---------- .. [1] https://en.wikipedia.org/wiki/Mixed-data_sampling .. [2] https://pandas.pydata.org/docs/user_guide/timeseries.html#dateoffset-objects """ if pd.tseries.frequencies.get_period_alias(low_freq) is None: raise_log( ValueError( f"Cannot infer period alias for `low_freq={low_freq}`. " f"Is it a valid pandas offset/frequency alias?" ), logger=logger, ) self._low_freq = pd.tseries.frequencies.to_offset(low_freq).freqstr self._strip = strip self._drop_static_covariates = drop_static_covariates self._sep = "_midas_" # Original high frequency should be fitted on TimeSeries independently super().__init__(name=name, n_jobs=n_jobs, verbose=verbose, global_fit=False)
[docs] @staticmethod def ts_fit( series: Union[TimeSeries, Sequence[TimeSeries]], params: Mapping[str, Any], *args, **kwargs, ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: """MIDAS needs the high frequency period name in order to easily reverse_transform TimeSeries, the parallelization is handled by `transform` and/or `inverse_transform` (see InvertibleDataTransformer.__init__() docstring). """ is_single_series = isinstance(series, TimeSeries) if is_single_series: series = [series] fitted_params = [] low_freq = params["fixed"]["_low_freq"] for idx, ts in enumerate(series): high_freq = ts.freq_str if not pd.tseries.frequencies.is_subperiod( pd.tseries.frequencies.get_period_alias(high_freq), pd.tseries.frequencies.get_period_alias(low_freq), ): raise_log( ValueError( f"The frequency string of the series at index={idx} must be higher than the " f"`low_freq` set at MIDAS creation. " f"Received series frequency {high_freq} against `low_freq={low_freq}`" ), logger=logger, ) fitted_params.append( { "high_freq": high_freq, "start": ts.start_time(), "end": ts.end_time(), } ) return fitted_params[0] if is_single_series else fitted_params
[docs] @staticmethod def ts_transform(series: TimeSeries, params: Mapping[str, Any]) -> TimeSeries: """ Transforms series from high to low frequency using a mixed-data sampling approach. Uses and relies on pandas.DataFrame.resample. When converting to/from anchorable offset [1]_, the index is rolled backward if the series does not start on the anchor date to preserve all the values. Steps: (1) Transform series to pd.DataFrame and get frequency string for PeriodIndex (2) Downsample series and then upsample it again (3) Replace input series by unsampled series if it's not 'full' (4) Transform every column of the high frequency series into multiple columns for the low frequency series (5) Transform the low frequency series back into a TimeSeries """ low_freq = params["fixed"]["_low_freq"] strip = params["fixed"]["_strip"] drop_static_covariates = params["fixed"]["_drop_static_covariates"] feature_sep = params["fixed"]["_sep"] high_freq = params["fitted"]["high_freq"] MIDAS._verify_series(series, high_freq=high_freq) # TimeSeries to pd.DataFrame df = pd.DataFrame(index=series.time_index) # get high frequency string that's suitable for PeriodIndex high_freq_period = pd.tseries.frequencies.get_period_alias(series.freq_str) # downsample resampled = df.resample(low_freq) low_freq_df = resampled.last() def up_sample(low_df: pd.DataFrame, high_period): """up sample a single index DataFrame to a higher frequency""" low_df = low_df.copy(deep=True) low_df.index = low_df.index.to_period() return low_df.resample(rule=high_period).last() # first and last groups can be shorter than an entire lower freq period # we up_sample them from the low to high frequency to get the expected number # higher freq time steps in one lower freq first_up_sampled = up_sample(low_freq_df.iloc[:1], high_freq_period) last_up_sampled = up_sample(low_freq_df.iloc[-1:], high_freq_period) # find unique sizes from: first group size + unique sizes of center groups + last group size sizes = np.unique( [len(first_up_sampled)] + resampled.size()[1:-1].unique().tolist() + [len(last_up_sampled)] ) # MIDAS requires the high freq to be a round multiple of the low freq -> sizes must be identical if not len(sizes) == 1: raise_log( ValueError( "The frequency of the input series should be an exact multiple of the targeted " f"lower frequency output. Received series frequency `{high_freq}`, and lower frequency " f"{low_freq}. E.g., a valid conversion would be from a monthly (high) to a quarterly " f"(low) frequency." ), logger=logger, ) # max size is the number of higher frequency time steps per lower frequency period max_size = sizes[0] n_samples = series.n_samples n_cols_in = series.n_components n_cols_out = max_size * n_cols_in series_size = len(series) # how many input time steps are in each down-sampled lower frequency period group_sizes = resampled.size() n_groups = len(group_sizes) arr = series.all_values(copy=False) time_index = low_freq_df.index if series_size <= max_size: # we can't apply windowing when series is shorter than `max_size` # we have at least one group, maximum two first_idx = first_up_sampled.index.get_loc(df.index[0]) last_idx = first_idx + series_size - 1 start_chunk = np.empty((first_idx, 1, 1)) start_chunk.fill(np.nan) end_chunk = np.empty((max_size - 1 - last_idx, 1, 1)) end_chunk.fill(np.nan) arr = np.concatenate([start_chunk, arr, end_chunk]) # arr has shape (n time steps, n components, n samples) # reshape to (1 time step, n midas components, n samples) arr = arr.reshape(1, n_cols_out, n_samples) if strip: # results in an empty series arr = arr[0:0] time_index = time_index[0:0] else: # guarantee that we have at least two groups since series is longer than `max_size` # extract rows from higher frequency and convert them to columns in the lower frequency # we can achieve this by extracting all windows with a size of `max_size`; # later on we stride to get only the relevant windows each `max_size` steps # create maximum possible output array arr_out = np.empty((n_groups, n_cols_out, n_samples)) arr_out.fill(np.nan) arr = np.lib.stride_tricks.sliding_window_view( arr, window_shape=(max_size, n_cols_in, n_samples) ) arr = arr.reshape((-1, n_cols_out, n_samples)) # the first resampled index might not have all dates from higher freq size_group_first = group_sizes.iloc[0] size_group_first = 0 if size_group_first == max_size else size_group_first components_group_first = size_group_first * n_cols_in if components_group_first and not strip: arr_out[0, n_cols_out - components_group_first :, :] = arr[ 0, :components_group_first ] center_start_idx = 0 if not size_group_first else 1 # the last resampled index might not have all dates from higher freq size_group_last = group_sizes.iloc[-1] size_group_last = 0 if size_group_last == max_size else size_group_last components_group_last = size_group_last * n_cols_in if components_group_last and not strip: arr_out[-1, :components_group_last, :] = arr[ -1, -components_group_last: ] # get the center resampled indices center_end_idx = None if not size_group_last else -1 arr_out[center_start_idx:center_end_idx, :, :] = arr[ size_group_first::max_size ] # potentially strip first and last groups if strip: first_idx = None if not size_group_first else 1 last_idx = None if not size_group_last else -1 arr_out = arr_out[first_idx:last_idx] time_index = time_index[first_idx:last_idx] arr = arr_out ts = MIDAS._create_midas_df( series=series, arr=arr, time_index=time_index, n_midas=max_size, drop_static_covariates=drop_static_covariates, inverse_transform=False, feature_sep=feature_sep, ) return ts
[docs] @staticmethod def ts_inverse_transform( series: TimeSeries, params: Mapping[str, Any], ) -> TimeSeries: """ Transforms series back to high frequency by retrieving the original high frequency and reshaping the values. When converting to/from anchorable offset [1]_, the index is rolled backward if the series does not start on the anchor date to preserve all the values. Steps: (1) Reshape the values to flatten the components introduced by the transform (2) Eliminate the rows filled with NaNs, to facilitate time index adjustment (3) Retrieve the original components name (4) When applicable, shift the time index start back in time (5) Generate a new time index with the high frequency References ---------- .. [1] https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#anchored-offsets """ low_freq = params["fixed"]["_low_freq"] drop_static_covariates = params["fixed"]["_drop_static_covariates"] feature_sep = params["fixed"]["_sep"] high_freq = params["fitted"]["high_freq"] orig_ts_start_time = params["fitted"]["start"] orig_ts_end_time = params["fitted"]["end"] MIDAS._verify_series(series, low_freq=low_freq) # retrieve the number of component introduced by midas n_midas_components = int(series.components[-1].split(feature_sep)[-1]) + 1 series_n_components = series.n_components n_orig_components = series_n_components // n_midas_components if len(series) == 0: # placeholders for empty series start_time = pd.Timestamp("2020-01-01") shift = 0 series_values = np.empty((0, n_orig_components, series.n_samples)) else: series_values = series.all_values(copy=False).reshape( -1, n_orig_components, series.n_samples ) # remove the rows containing only NaNs at the extremities of the array, necessary to adjust the time index first_finite_row, last_finite_row = _finite_rows_boundaries( series_values, how="all" ) # adding one to make the end bound inclusive series_values = series_values[first_finite_row : last_finite_row + 1] start_time = series.start_time() shift = 0 # adjust the start if was shifted due to the frequency change if len(series.time_index) > 1: low_freq_timedelta = series.time_index[1] - series.time_index[0] start_to_start_shift = series.time_index[0] - orig_ts_start_time start_to_end_shift = series.time_index[0] - orig_ts_end_time # shift is caused by the low frequency anchoring, fitted and inversed ts have the same start if np.abs(start_to_start_shift) <= low_freq_timedelta: start_time = orig_ts_start_time # shift is caused by the low frequency anchoring, inversed ts starts after the end of the fitted ts elif pd.Timedelta(0) < start_to_end_shift <= low_freq_timedelta: start_time = orig_ts_end_time shift = 1 time_index = generate_index( start=start_time, length=len(series_values) + shift, freq=high_freq, name=series.time_index.name, )[shift:] inversed_midas_ts = MIDAS._create_midas_df( series=series, arr=series_values, time_index=time_index, n_midas=n_midas_components, drop_static_covariates=drop_static_covariates, inverse_transform=True, feature_sep=feature_sep, ) return inversed_midas_ts
@staticmethod def _verify_series( series: TimeSeries, high_freq: Optional[str] = None, low_freq: Optional[str] = None, ): """Some sanity checks on the input, the high_freq and low_freq arguments are mutually exclusive""" if not isinstance(series.time_index, pd.DatetimeIndex): raise_log( ValueError("MIDAS input series must have a pd.Datetime index"), logger, ) series_freq_str = series.freq_str input_freq = [series_freq_str] # flexibility on anchoring if "-" in series_freq_str: input_freq.append(series_freq_str.split("-")[0]) if high_freq is not None and high_freq not in input_freq: raise_log( ValueError( f"The frequency string of the series to transform must be identical to the fitted one, expected " f"{high_freq} but received {series_freq_str}." ), logger=logger, ) if low_freq is not None and low_freq not in input_freq: raise_log( ValueError( f"The frequency string of the series to inverse-transform must be identical to the fitted one, " f"expected {low_freq} but received {series_freq_str}." ), logger=logger, ) @staticmethod def _process_static_covariates( series: TimeSeries, n_midas: int, drop_static_covariates: bool, inverse_transform: bool, ) -> Optional[Union[pd.Series, pd.DataFrame]]: """ If static covariates are component-specific, they must be reshaped appropriately. """ static_covariates = series.static_covariates if drop_static_covariates: return None elif ( static_covariates is not None and static_covariates.index.name == "component" ): if inverse_transform: cols_orig = series.n_components // n_midas return static_covariates[:cols_orig] else: return pd.concat([static_covariates] * n_midas) else: return static_covariates @staticmethod def _create_midas_df( series: TimeSeries, arr: np.ndarray, time_index: Union[pd.DatetimeIndex, pd.RangeIndex], n_midas: int, drop_static_covariates: bool, inverse_transform: bool, feature_sep: str, ) -> TimeSeries: """ Function creating the lower frequency dataframe out of a higher frequency dataframe. """ if not inverse_transform: cols = [ f"{col}{feature_sep}{i}" for i in range(n_midas) for col in series.columns ] else: cols_orig = series.n_components // n_midas cols = series.components[:cols_orig].str.split(feature_sep).str[0].tolist() static_covariates = MIDAS._process_static_covariates( series=series, n_midas=n_midas, drop_static_covariates=drop_static_covariates, inverse_transform=inverse_transform, ) return TimeSeries.from_times_and_values( times=time_index, values=arr, columns=cols, static_covariates=static_covariates, )