From aaaa234ad422d45a660135f2ca9877271c2c9c8b Mon Sep 17 00:00:00 2001 From: madtoinou Date: Wed, 18 Dec 2024 14:26:13 +0100 Subject: [PATCH 01/18] feat: adding stride to shifted_dataset --- darts/utils/data/shifted_dataset.py | 43 +++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/darts/utils/data/shifted_dataset.py b/darts/utils/data/shifted_dataset.py index d7e537dc4b..c1f2160516 100644 --- a/darts/utils/data/shifted_dataset.py +++ b/darts/utils/data/shifted_dataset.py @@ -31,6 +31,7 @@ def __init__( covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -65,6 +66,8 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -93,6 +96,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, @@ -122,6 +126,7 @@ def __init__( covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -158,6 +163,8 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -187,6 +194,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=True, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, @@ -216,6 +224,7 @@ def __init__( covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -254,6 +263,8 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -284,6 +295,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.HISTORIC_FUTURE, @@ -298,6 +310,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=True, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, @@ -339,6 +352,7 @@ def __init__( future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -377,6 +391,8 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -406,6 +422,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, @@ -419,6 +436,7 @@ def __init__( covariates=future_covariates, length=length, shift=shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=use_static_covariates, ) @@ -460,6 +478,7 @@ def __init__( future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -498,6 +517,8 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -528,6 +549,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, @@ -542,6 +564,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=True, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, @@ -584,6 +607,7 @@ def __init__( output_chunk_length: int = 1, shift: int = 1, shift_covariates: bool = False, + stride: int = 1, max_samples_per_ts: Optional[int] = None, covariate_type: CovariateType = CovariateType.NONE, use_static_covariates: bool = True, @@ -613,9 +637,11 @@ def __init__( shift_covariates Whether to shift the covariates forward the same way as the target. FutureCovariatesModel's require this set to True, while PastCovariatesModel's require this set to False. + stride + The number of time steps between consecutive entries. max_samples_per_ts - This is an upper bound on the number of (input, output, input_covariates) tuples that can be produced - per time series. It can be used in order to have an upper bound on the total size of the dataset and + This is an upper bound on the number of tuples that can be produced per time series. + It can be used in order to have an upper bound on the total size of the dataset and ensure proper sampling. If `None`, it will read all of the individual time series in advance (at dataset creation) to know their sizes, which might be expensive on big datasets. If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the @@ -642,6 +668,7 @@ def __init__( self.input_chunk_length = input_chunk_length self.output_chunk_length = output_chunk_length self.shift = shift + self.stride = stride self.max_samples_per_ts = max_samples_per_ts self.size_of_both_chunks = max( self.input_chunk_length, self.shift + self.output_chunk_length @@ -693,8 +720,8 @@ def __init__( if self.max_samples_per_ts is None: # read all time series to get the maximum size self.max_samples_per_ts = ( - max(len(ts) for ts in self.target_series) - self.size_of_both_chunks + 1 - ) + max(len(ts) for ts in self.target_series) - self.size_of_both_chunks + ) // self.stride + 1 self.ideal_nr_samples = len(self.target_series) * self.max_samples_per_ts def __len__(self): @@ -715,7 +742,9 @@ def __getitem__( target_vals = target_series.random_component_values(copy=False) # determine the actual number of possible samples in this time series - n_samples_in_ts = len(target_vals) - self.size_of_both_chunks + 1 + n_samples_in_ts = ( + len(target_vals) - self.size_of_both_chunks + ) // self.stride + 1 if n_samples_in_ts < 1: raise_log( @@ -731,7 +760,9 @@ def __getitem__( # it is originally in [0, self.max_samples_per_ts), so we use a modulo to have it in [0, n_samples_in_ts) end_of_output_idx = ( len(target_series) - - (idx - (target_idx * self.max_samples_per_ts)) % n_samples_in_ts + - (idx - (target_idx * self.max_samples_per_ts)) + % n_samples_in_ts + * self.stride ) # optionally, load covariates From dec1636e2d8ba1298aa76c174f4ae7000cda7b88 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Wed, 18 Dec 2024 14:34:10 +0100 Subject: [PATCH 02/18] feat: adding stride to the sequential dataset --- darts/utils/data/sequential_dataset.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/darts/utils/data/sequential_dataset.py b/darts/utils/data/sequential_dataset.py index 0ebd68a1cb..216cf0994c 100644 --- a/darts/utils/data/sequential_dataset.py +++ b/darts/utils/data/sequential_dataset.py @@ -28,6 +28,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -64,6 +65,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -94,6 +97,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, use_static_covariates=use_static_covariates, @@ -123,6 +127,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -159,6 +164,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -189,6 +196,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=True, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, @@ -218,6 +226,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -255,6 +264,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -286,6 +297,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.HISTORIC_FUTURE, use_static_covariates=use_static_covariates, @@ -300,6 +312,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=True, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, @@ -341,6 +354,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -381,6 +395,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -412,6 +428,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, use_static_covariates=use_static_covariates, @@ -425,6 +442,7 @@ def __init__( input_chunk_length=input_chunk_length, output_chunk_length=output_chunk_length, output_chunk_shift=output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=use_static_covariates, ) @@ -467,6 +485,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -506,6 +525,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -536,6 +557,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, use_static_covariates=use_static_covariates, @@ -550,6 +572,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=True, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, From 56b53ed3906309d553630fc3f1caf960aeddc14d Mon Sep 17 00:00:00 2001 From: madtoinou Date: Wed, 18 Dec 2024 15:05:43 +0100 Subject: [PATCH 03/18] feat: add striding to tabularization --- darts/utils/data/tabularization.py | 36 ++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index 1742f7ccd1..3348c0fdff 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -38,6 +38,7 @@ def create_lagged_data( is_training: bool = True, concatenate: bool = True, sample_weight: Optional[Union[str, TimeSeries, Sequence[TimeSeries]]] = None, + stride: int = 1, show_warnings: bool = True, ) -> tuple[ ArrayOrArraySequence, @@ -220,6 +221,8 @@ def create_lagged_data( `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are computed globally based on the length of the longest series in `series`. Then for each series, the weights are extracted from the end of the global weights. This gives a common time weighting across all series. + stride + The number of time steps between consecutive entries. show_warnings Whether to show warnings. @@ -357,6 +360,7 @@ def create_lagged_data( multi_models=multi_models, check_inputs=check_inputs, is_training=is_training, + stride=stride, show_warnings=show_warnings, ) else: @@ -374,6 +378,7 @@ def create_lagged_data( multi_models=multi_models, check_inputs=check_inputs, is_training=is_training, + stride=stride, show_warnings=show_warnings, ) X_i, last_static_covariates_shape = add_static_covariates_to_lagged_data( @@ -418,6 +423,7 @@ def create_lagged_training_data( check_inputs: bool = True, use_moving_windows: bool = True, concatenate: bool = True, + stride: int = 1, sample_weight: Optional[Union[TimeSeries, str]] = None, ) -> tuple[ ArrayOrArraySequence, @@ -497,6 +503,8 @@ def create_lagged_training_data( when `Sequence[TimeSeries]` are provided, then `X` and `y` will be arrays created by concatenating all feature/label arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. + stride + The number of time steps between consecutive entries. sample_weight Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, per label (each step in `output_chunk_length`), and per component. @@ -559,6 +567,7 @@ def create_lagged_training_data( use_moving_windows=use_moving_windows, is_training=True, concatenate=concatenate, + stride=stride, sample_weight=sample_weight, ) @@ -576,6 +585,7 @@ def create_lagged_prediction_data( check_inputs: bool = True, use_moving_windows: bool = True, concatenate: bool = True, + stride: int = 1, show_warnings: bool = True, ) -> tuple[ArrayOrArraySequence, Sequence[pd.Index]]: """ @@ -641,6 +651,8 @@ def create_lagged_prediction_data( `Sequence[TimeSeries]` are provided, then `X` will be an array created by concatenating all feature arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. + stride + The number of time steps between consecutive entries. show_warnings Whether to show warnings. @@ -683,6 +695,7 @@ def create_lagged_prediction_data( use_moving_windows=use_moving_windows, is_training=False, concatenate=concatenate, + stride=stride, show_warnings=show_warnings, ) return X, times @@ -971,6 +984,7 @@ def _create_lagged_data_by_moving_window( multi_models: bool, check_inputs: bool, is_training: bool, + stride: int, show_warnings: bool = True, ) -> tuple[np.ndarray, Optional[np.ndarray], pd.Index, Optional[np.ndarray]]: """ @@ -1008,6 +1022,11 @@ def _create_lagged_data_by_moving_window( raise_log( ValueError("Must specify at least one series-lags pair."), logger=logger ) + if not isinstance(stride, int) and stride < 1: + raise_log( + ValueError("`stride` must be a positive integer greater than 1."), + logger=logger, + ) sample_weight_vals = _extract_sample_weight(sample_weight, target_series) time_bounds = get_shared_times_bounds(*feature_times) @@ -1091,7 +1110,7 @@ def _create_lagged_data_by_moving_window( first_window_start_idx : first_window_end_idx + num_samples - 1, :, : ] windows = strided_moving_window( - x=vals, window_len=window_len, stride=1, axis=0, check_inputs=False + x=vals, window_len=window_len, stride=stride, axis=0, check_inputs=False ) # Within each window, the `-1` indexed value (i.e. the value at the very end of @@ -1135,7 +1154,7 @@ def _create_lagged_data_by_moving_window( windows = strided_moving_window( x=vals, window_len=output_chunk_length, - stride=1, + stride=stride, axis=0, check_inputs=False, ) @@ -1207,6 +1226,7 @@ def _create_lagged_data_by_intersecting_times( multi_models: bool, check_inputs: bool, is_training: bool, + stride: int, show_warnings: bool = True, ) -> tuple[ np.ndarray, @@ -1243,6 +1263,11 @@ def _create_lagged_data_by_intersecting_times( raise_log( ValueError("Must specify at least one series-lags pair."), logger=logger ) + if not isinstance(stride, int) and stride < 1: + raise_log( + ValueError("`stride` must be a positive integer greater than 1."), + logger=logger, + ) sample_weight_vals = _extract_sample_weight(sample_weight, target_series) shared_times = get_shared_times(*feature_times, sort=True) if shared_times is None: @@ -1293,6 +1318,9 @@ def _create_lagged_data_by_intersecting_times( ) if series_and_lags_specified: idx_to_get = shared_time_idx + np.array(lags_i, dtype=int) + # apply the stride to the indexes + if stride > 1: + idx_to_get = idx_to_get[::stride] # Before reshaping: lagged_vals.shape = (n_observations, num_lags, n_components, n_samples) lagged_vals = series_i.all_values(copy=False)[idx_to_get, :, :] # After reshaping: lagged_vals.shape = (n_observations, num_lags*n_components, n_samples) @@ -1318,6 +1346,10 @@ def _create_lagged_data_by_intersecting_times( label_shared_time_idx + output_chunk_length + output_chunk_shift - 1 ) + # apply the stride to the indexes + if stride > 1: + idx_to_get = idx_to_get[::stride] + # extract target labels and sample weights y_and_weights = [] for vals in [target_series.all_values(copy=False), sample_weight_vals]: From 9847469d40dfe8195e5126c7a5517478060df91c Mon Sep 17 00:00:00 2001 From: madtoinou Date: Wed, 18 Dec 2024 15:09:17 +0100 Subject: [PATCH 04/18] feat: add stride to RegressioModel fit API --- darts/models/forecasting/regression_model.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/darts/models/forecasting/regression_model.py b/darts/models/forecasting/regression_model.py index 5302dc0ab1..d3a6bb238b 100644 --- a/darts/models/forecasting/regression_model.py +++ b/darts/models/forecasting/regression_model.py @@ -600,6 +600,7 @@ def _create_lagged_data( future_covariates: Sequence[TimeSeries], max_samples_per_ts: int, sample_weight: Optional[Union[TimeSeries, str]] = None, + stride: int = 1, last_static_covariates_shape: Optional[tuple[int, int]] = None, ): ( @@ -624,6 +625,7 @@ def _create_lagged_data( check_inputs=False, concatenate=False, sample_weight=sample_weight, + stride=stride, ) expected_nb_feat = ( @@ -675,6 +677,7 @@ def _fit_model( future_covariates: Sequence[TimeSeries], max_samples_per_ts: int, sample_weight: Optional[Union[Sequence[TimeSeries], str]], + stride: int, val_series: Optional[Sequence[TimeSeries]] = None, val_past_covariates: Optional[Sequence[TimeSeries]] = None, val_future_covariates: Optional[Sequence[TimeSeries]] = None, @@ -692,6 +695,7 @@ def _fit_model( max_samples_per_ts=max_samples_per_ts, sample_weight=sample_weight, last_static_covariates_shape=None, + stride=stride, ) if self.supports_val_set and val_series is not None: @@ -741,6 +745,7 @@ def fit( max_samples_per_ts: Optional[int] = None, n_jobs_multioutput_wrapper: Optional[int] = None, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, + stride: int = 1, **kwargs, ): """ @@ -774,6 +779,8 @@ def fit( `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are computed globally based on the length of the longest series in `series`. Then for each series, the weights are extracted from the end of the global weights. This gives a common time weighting across all series. + stride + The number of time steps between consecutive entries. **kwargs Additional keyword arguments passed to the `fit` method of the model. """ @@ -952,6 +959,7 @@ def fit( sample_weight=sample_weight, val_sample_weight=val_sample_weight, max_samples_per_ts=max_samples_per_ts, + stride=stride, **kwargs, ) return self From d0558240cc1dbc402e916788134837c30640f73d Mon Sep 17 00:00:00 2001 From: madtoinou Date: Wed, 18 Dec 2024 16:56:48 +0100 Subject: [PATCH 05/18] fix: bug in striding implementation --- darts/utils/data/tabularization.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index 3348c0fdff..d29f3ac299 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -1045,6 +1045,9 @@ def _create_lagged_data_by_moving_window( ) else: times = pd.date_range(start=time_bounds[0], end=time_bounds[1], freq=freq) + + if stride > 1: + times = times[::stride] num_samples = len(times) if num_samples > max_samples_per_ts: times = times[-max_samples_per_ts:] @@ -1103,11 +1106,15 @@ def _create_lagged_data_by_moving_window( first_window_start_idx = start_time_idx - max_lag_i first_window_end_idx = first_window_start_idx + window_len # Other windows are formed by sequentially shifting first window forward - # by 1 index position each time; to create `(num_samples - 1)` more windows + # by stride position each time; to create `(num_samples - 1)` more windows # in addition to the first window, need to take `(num_samples - 1)` values # after `first_window_end_idx`: vals = series_i.all_values(copy=False)[ - first_window_start_idx : first_window_end_idx + num_samples - 1, :, : + first_window_start_idx : first_window_end_idx + + num_samples * stride + - 1, + :, + :, ] windows = strided_moving_window( x=vals, window_len=window_len, stride=stride, axis=0, check_inputs=False @@ -1147,7 +1154,9 @@ def _create_lagged_data_by_moving_window( # To create `(num_samples - 1)` other windows in addition to first window, # must take `(num_samples - 1)` values ahead of `first_window_end_idx` vals = vals[ - first_window_start_idx : first_window_end_idx + num_samples - 1, + first_window_start_idx : first_window_end_idx + + num_samples * stride + - 1, :, :, ] @@ -1277,6 +1286,8 @@ def _create_lagged_data_by_intersecting_times( ), logger=logger, ) + if stride > 1: + shared_times = shared_times[::stride] if len(shared_times) > max_samples_per_ts: shared_times = shared_times[-max_samples_per_ts:] X = [] @@ -1318,9 +1329,6 @@ def _create_lagged_data_by_intersecting_times( ) if series_and_lags_specified: idx_to_get = shared_time_idx + np.array(lags_i, dtype=int) - # apply the stride to the indexes - if stride > 1: - idx_to_get = idx_to_get[::stride] # Before reshaping: lagged_vals.shape = (n_observations, num_lags, n_components, n_samples) lagged_vals = series_i.all_values(copy=False)[idx_to_get, :, :] # After reshaping: lagged_vals.shape = (n_observations, num_lags*n_components, n_samples) @@ -1346,10 +1354,6 @@ def _create_lagged_data_by_intersecting_times( label_shared_time_idx + output_chunk_length + output_chunk_shift - 1 ) - # apply the stride to the indexes - if stride > 1: - idx_to_get = idx_to_get[::stride] - # extract target labels and sample weights y_and_weights = [] for vals in [target_series.all_values(copy=False), sample_weight_vals]: From 67fd55044178023f8b93ef6a0c21b2172a3fa907 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 19 Dec 2024 15:44:29 +0100 Subject: [PATCH 06/18] feat: updating the tabularization tests --- .../test_create_lagged_training_data.py | 176 ++++++++++++++---- darts/utils/data/tabularization.py | 3 +- 2 files changed, 142 insertions(+), 37 deletions(-) diff --git a/darts/tests/utils/tabularization/test_create_lagged_training_data.py b/darts/tests/utils/tabularization/test_create_lagged_training_data.py index cd4f32f1e9..b1efa3ea0e 100644 --- a/darts/tests/utils/tabularization/test_create_lagged_training_data.py +++ b/darts/tests/utils/tabularization/test_create_lagged_training_data.py @@ -72,7 +72,8 @@ def get_feature_times( output_chunk_length: Optional[int], max_samples_per_ts: Optional[int], output_chunk_shift: int, - ): + stride: int, + ) -> pd.Index: """ Helper function that returns the times shared by all specified series that can be used to create features and labels. This is performed by using the helper functions @@ -101,6 +102,9 @@ def get_feature_times( future, lags_future ) times = times.intersection(future_times) + # Apply stride + if stride > 1: + times = times[::stride] # Take most recent `max_samples_per_ts` samples if requested: if (max_samples_per_ts is not None) and (len(times) > max_samples_per_ts): times = times[-max_samples_per_ts:] @@ -433,6 +437,7 @@ def helper_create_expected_lagged_data( output_chunk_shift: int, multi_models: bool, max_samples_per_ts: Optional[int], + stride: int, ) -> tuple[np.ndarray, np.ndarray, Any]: """Helper function to create the X and y arrays by building them block by block (one per covariates).""" feats_times = self.get_feature_times( @@ -445,6 +450,7 @@ def helper_create_expected_lagged_data( output_chunk_length, max_samples_per_ts, output_chunk_shift, + stride, ) # Construct `X` by constructing each block, then concatenate these # blocks together along component axis: @@ -487,6 +493,7 @@ def helper_check_lagged_data( max_samples_per_ts: Optional[int], use_moving_windows: bool, concatenate: bool, + stride: int, **kwargs, ): """Helper function to call the `create_lagged_training_data()` method with lags argument either in the list @@ -537,6 +544,7 @@ def helper_check_lagged_data( use_moving_windows=use_moving_windows, output_chunk_shift=output_chunk_shift, concatenate=concatenate, + stride=stride, ) # should have the exact same number of indexes assert len(times) == len(expected_times_x) == len(expected_times_y) @@ -642,10 +650,13 @@ def helper_check_lagged_data( min_n_ts = 8 + max(output_chunk_shift_combos) @pytest.mark.parametrize( - "series_type", - ["datetime", "integer"], + "params", + product( + ["datetime", "integer"], # series_type + [1, 3], # stride + ), ) - def test_lagged_training_data_equal_freq(self, series_type: str): + def test_lagged_training_data_equal_freq(self, params): """ Tests that `create_lagged_training_data` produces `X`, `y`, and `times` outputs that are consistent with those generated by using the helper @@ -659,6 +670,7 @@ def test_lagged_training_data_equal_freq(self, series_type: str): are of the same frequency, the implementation of the 'moving window' method is being tested here. """ + series_type, stride = params # Define datetime index timeseries - each has different number of components, # different start times, different lengths, and different values, but # they're all of the same frequency: @@ -749,6 +761,7 @@ def test_lagged_training_data_equal_freq(self, series_type: str): output_chunk_shift, multi_models, max_samples_per_ts, + stride, ) ) @@ -770,6 +783,7 @@ def test_lagged_training_data_equal_freq(self, series_type: str): "max_samples_per_ts": max_samples_per_ts, "use_moving_windows": True, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -777,10 +791,13 @@ def test_lagged_training_data_equal_freq(self, series_type: str): self.helper_check_lagged_data(convert_lags_to_dict=True, **kwargs) @pytest.mark.parametrize( - "series_type", - ["datetime", "integer"], + "params", + product( + ["datetime", "integer"], # series_type + [1, 3], # stride + ), ) - def test_lagged_training_data_unequal_freq(self, series_type): + def test_lagged_training_data_unequal_freq(self, params): """ Tests that `create_lagged_training_data` produces `X`, `y`, and `times` outputs that are consistent with those generated by using the helper @@ -794,6 +811,7 @@ def test_lagged_training_data_unequal_freq(self, series_type): are *not* of the same frequency, the implementation of the 'time intersection' method is being tested here. """ + series_type, stride = params # Define range index timeseries - each has different number of components, # different start times, different lengths, different values, and different # frequencies: @@ -869,6 +887,7 @@ def test_lagged_training_data_unequal_freq(self, series_type): output_chunk_shift, multi_models, max_samples_per_ts, + stride, ) ) @@ -890,6 +909,7 @@ def test_lagged_training_data_unequal_freq(self, series_type): "max_samples_per_ts": max_samples_per_ts, "use_moving_windows": False, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -901,10 +921,13 @@ def test_lagged_training_data_unequal_freq(self, series_type): ) @pytest.mark.parametrize( - "series_type", - ["datetime", "integer"], + "params", + product( + ["datetime", "integer"], # series_type + [1, 3], # stride + ), ) - def test_lagged_training_data_method_consistency(self, series_type): + def test_lagged_training_data_method_consistency(self, params): """ Tests that `create_lagged_training_data` produces the same result when `use_moving_windows = False` and when `use_moving_windows = True` @@ -918,6 +941,7 @@ def test_lagged_training_data_method_consistency(self, series_type): # Define datetime index timeseries - each has different number of components, # different start times, different lengths, different values, and of # different frequencies: + series_type, stride = params if series_type == "integer": target = helper_create_multivariate_linear_timeseries( n_components=2, start_value=0, end_value=10, start=2, length=20, freq=1 @@ -991,6 +1015,7 @@ def test_lagged_training_data_method_consistency(self, series_type): multi_models=multi_models, use_moving_windows=True, output_chunk_shift=output_chunk_shift, + stride=stride, ) # Using time intersection method: X_ti, y_ti, times_ti, _, _ = create_lagged_training_data( @@ -1006,6 +1031,7 @@ def test_lagged_training_data_method_consistency(self, series_type): multi_models=multi_models, use_moving_windows=False, output_chunk_shift=output_chunk_shift, + stride=stride, ) assert np.allclose(X_mw, X_ti) assert np.allclose(y_mw, y_ti) @@ -1021,6 +1047,7 @@ def test_lagged_training_data_method_consistency(self, series_type): [0, 1, 3], [False, True], ["datetime", "integer"], + [1, 3], # stride ), ) def test_lagged_training_data_single_lag_single_component_same_series(self, config): @@ -1032,7 +1059,7 @@ def test_lagged_training_data_single_lag_single_component_same_series(self, conf same time series, and the expected `y` can be formed by taking a single slice from the `target`. """ - output_chunk_shift, use_moving_windows, series_type = config + output_chunk_shift, use_moving_windows, series_type, stride = config if series_type == "integer": series = linear_timeseries(start=0, length=15) else: @@ -1069,6 +1096,12 @@ def test_lagged_training_data_single_lag_single_component_same_series(self, conf ) expected_X = np.expand_dims(expected_X, axis=-1) + if stride > 1: + expected_X = expected_X[::stride] + expected_y = expected_y[::stride] + expected_times_x = expected_times_x[::stride] + expected_times_y = expected_times_y[::stride] + kwargs = { "expected_X": expected_X, "expected_y": expected_y, @@ -1087,6 +1120,7 @@ def test_lagged_training_data_single_lag_single_component_same_series(self, conf "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1196,6 +1230,7 @@ def test_lagged_training_data_extend_past_and_future_covariates(self, config): "max_samples_per_ts": max_samples_per_ts, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": 1, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1211,8 +1246,8 @@ def test_lagged_training_data_extend_past_and_future_covariates(self, config): @pytest.mark.parametrize( "config", - itertools.product( - [0, 1, 3], [False, True], ["datetime", "integer"], [False, True] + product( + [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [1, 3] ), ) def test_lagged_training_data_single_point(self, config): @@ -1220,7 +1255,9 @@ def test_lagged_training_data_single_point(self, config): Tests that `create_lagged_training_data` correctly handles case where only one possible training point can be generated. """ - output_chunk_shift, use_moving_windows, series_type, multi_models = config + output_chunk_shift, use_moving_windows, series_type, multi_models, stride = ( + config + ) # Can only create feature using first value of series (i.e. `0`) # and can only create label using last value of series (i.e. `1`) if series_type == "integer": @@ -1244,6 +1281,11 @@ def test_lagged_training_data_single_point(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X = expected_X[::stride] + expected_y = expected_y[::stride] + expected_times = expected_times[::stride] + # Test correctness for 'moving window' and for 'time intersection' methods, as well # as for different `multi_models` values: kwargs = { @@ -1264,6 +1306,7 @@ def test_lagged_training_data_single_point(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1280,7 +1323,7 @@ def test_lagged_training_data_single_point(self, config): @pytest.mark.parametrize( "config", itertools.product( - [0, 1, 3], [False, True], ["datetime", "integer"], [False, True] + [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [1, 3] ), ) def test_lagged_training_data_zero_lags(self, config): @@ -1295,7 +1338,9 @@ def test_lagged_training_data_zero_lags(self, config): # only possible feature that can be created using these series utilises # the value of `future` at the same time as the label (i.e. a lag # of `0` away from the only feature time): - output_chunk_shift, use_moving_windows, series_type, multi_models = config + output_chunk_shift, use_moving_windows, series_type, multi_models, stride = ( + config + ) if series_type == "integer": target = linear_timeseries( @@ -1329,6 +1374,11 @@ def test_lagged_training_data_zero_lags(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X = expected_X[::stride] + expected_y = expected_y[::stride] + expected_times = expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1349,6 +1399,7 @@ def test_lagged_training_data_zero_lags(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1364,13 +1415,14 @@ def test_lagged_training_data_zero_lags(self, config): @pytest.mark.parametrize( "config", - itertools.product( + product( [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [-1, 0, 1], [-2, 0, 2], + [1, 3], ), ) def test_lagged_training_data_no_target_lags_future_covariates(self, config): @@ -1390,6 +1442,7 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): multi_models, cov_start_shift, cov_lag, + stride, ) = config # adapt covariate start, length, and target length so that only 1 sample can be extracted @@ -1429,6 +1482,11 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X[::stride] + expected_y[::stride] + expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1449,6 +1507,7 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1471,6 +1530,7 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): [False, True], [-1, 0], [-2, -1], + [1, 3], ), ) def test_lagged_training_data_no_target_lags_past_covariates(self, config): @@ -1489,6 +1549,7 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): multi_models, cov_start_shift, cov_lag, + stride, ) = config # adapt covariate start, length, and target length so that only 1 sample can be extracted @@ -1528,6 +1589,11 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X[::stride] + expected_y[::stride] + expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1548,6 +1614,7 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1564,7 +1631,7 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): @pytest.mark.parametrize( "config", itertools.product( - [0, 1, 3], [False, True], ["datetime", "integer"], [False, True] + [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [1, 3] ), ) def test_lagged_training_data_positive_lags(self, config): @@ -1580,7 +1647,9 @@ def test_lagged_training_data_positive_lags(self, config): # only possible feature that can be created using these series utilises # the value of `future` one timestep after the time of the label (i.e. a lag # of `1` away from the only feature time): - output_chunk_shift, use_moving_windows, series_type, multi_models = config + output_chunk_shift, use_moving_windows, series_type, multi_models, stride = ( + config + ) if series_type == "integer": target = linear_timeseries( @@ -1613,6 +1682,11 @@ def test_lagged_training_data_positive_lags(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X[::stride] + expected_y[::stride] + expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1633,6 +1707,7 @@ def test_lagged_training_data_positive_lags(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1653,6 +1728,7 @@ def test_lagged_training_data_positive_lags(self, config): [1, 2], [True, False], ["datetime", "integer"], + [1, 3], ), ) def test_lagged_training_data_comp_wise_lags(self, config): @@ -1662,7 +1738,9 @@ def test_lagged_training_data_comp_wise_lags(self, config): Note that this is supported only when use_moving_window=True. """ - output_chunk_shift, output_chunk_length, multi_models, series_type = config + output_chunk_shift, output_chunk_length, multi_models, series_type, stride = ( + config + ) lags_tg = {"target_0": [-4, -1], "target_1": [-4, -1]} lags_pc = [-3] @@ -1716,6 +1794,7 @@ def test_lagged_training_data_comp_wise_lags(self, config): output_chunk_length, None, output_chunk_shift, + stride=stride, ) # reorder the features to obtain target_0_lag-4, target_1_lag-4, target_0_lag-1, target_1_lag-1 @@ -1762,6 +1841,10 @@ def test_lagged_training_data_comp_wise_lags(self, config): multi_models, output_chunk_shift, )[:, :, np.newaxis] + if stride > 1: + expected_X[::stride] + expected_y[::stride] + feats_times[::stride] # lags are already in dict format self.helper_check_lagged_data( @@ -1783,9 +1866,14 @@ def test_lagged_training_data_comp_wise_lags(self, config): max_samples_per_ts=None, use_moving_windows=True, concatenate=True, + stride=stride, ) - def test_lagged_training_data_sequence_inputs(self): + @pytest.mark.parametrize( + "stride", + [1, 3], + ) + def test_lagged_training_data_sequence_inputs(self, stride): """ Tests that `create_lagged_training_data` correctly handles being passed a sequence of `TimeSeries` inputs, as opposed to individual @@ -1806,12 +1894,14 @@ def test_lagged_training_data_sequence_inputs(self): expected_X_2 = np.concatenate( 3 * [target_2.all_values(copy=False)[:-1, :, :]], axis=1 ) - expected_X = np.concatenate([expected_X_1, expected_X_2], axis=0) - expected_y_1 = target_1.all_values(copy=False)[1:, :, :] - expected_y_2 = target_2.all_values(copy=False)[1:, :, :] + expected_X = np.concatenate( + [expected_X_1[::stride], expected_X_2[::stride]], axis=0 + ) + expected_y_1 = target_1.all_values(copy=False)[1::stride, :, :] + expected_y_2 = target_2.all_values(copy=False)[1::stride, :, :] expected_y = np.concatenate([expected_y_1, expected_y_2], axis=0) - expected_times_1 = target_1.time_index[1:] - expected_times_2 = target_2.time_index[1:] + expected_times_1 = target_1.time_index[1::stride] + expected_times_2 = target_2.time_index[1::stride] kwargs = { "expected_X": expected_X, @@ -1830,6 +1920,7 @@ def test_lagged_training_data_sequence_inputs(self): "multi_models": True, "max_samples_per_ts": None, "use_moving_windows": True, + "stride": stride, } # concatenate=True @@ -1848,7 +1939,11 @@ def test_lagged_training_data_sequence_inputs(self): convert_lags_to_dict=True, concatenate=False, **kwargs ) - def test_lagged_training_data_stochastic_series(self): + @pytest.mark.parametrize( + "stride", + [1, 3], + ) + def test_lagged_training_data_stochastic_series(self, stride): """ Tests that `create_lagged_training_data` is correctly vectorised over the sample axes of the input `TimeSeries`. @@ -1863,10 +1958,10 @@ def test_lagged_training_data_stochastic_series(self): output_chunk_length = 1 # Expected solution: expected_X = np.concatenate( - 3 * [target.all_values(copy=False)[:-1, :, :]], axis=1 + 3 * [target.all_values(copy=False)[:-1:stride, :, :]], axis=1 ) - expected_y = target.all_values(copy=False)[1:, :, :] - expected_times = target.time_index[1:] + expected_y = target.all_values(copy=False)[1::stride, :, :] + expected_times = target.time_index[1::stride] kwargs = { "expected_X": expected_X, @@ -1885,6 +1980,7 @@ def test_lagged_training_data_stochastic_series(self): "multi_models": True, "max_samples_per_ts": None, "use_moving_windows": True, + "stride": stride, } self.helper_check_lagged_data( @@ -2729,6 +2825,7 @@ def test_correct_generated_weights_exponential(self, config): ["D", "2D", 2], [True, False], [True, False], + [1, 3], ), ) def test_correct_user_weights(self, config): @@ -2751,14 +2848,18 @@ def test_correct_user_weights(self, config): freq, single_series, univar_series, + stride, ) = config + lags = [-4, -1] if not isinstance(freq, int): freq = pd.tseries.frequencies.to_offset(freq) start = pd.Timestamp("2000-01-01") else: start = 1 - train_y = linear_timeseries(start=start, length=training_size, freq=freq) + train_y = linear_timeseries( + start=start, end_value=training_size - 1, length=training_size, freq=freq + ) if not univar_series: train_y.stack(train_y) @@ -2776,13 +2877,14 @@ def test_correct_user_weights(self, config): ts_weights.stack(ts_weights + 1.0) _, y, _, _, weights = create_lagged_training_data( - lags=[-4, -1], + lags=lags, target_series=train_y if single_series else [train_y] * 2, output_chunk_length=ocl, uses_static_covariates=False, sample_weight=ts_weights if single_series else [ts_weights] * 2, output_chunk_shift=ocs, use_moving_windows=use_moving_windows, + stride=stride, ) # weights shape must match label shape, since we have one @@ -2796,11 +2898,15 @@ def test_correct_user_weights(self, config): # the weights correspond to the same sample and time index as the `y` labels expected_weights = [] - len_y_single = len(y) if single_series else int(len(y) / 2) + len_y_single = len(y) if single_series else len(y) // 2 for i in range(ocl): - mask = slice(-(i + len_y_single), -i if i else None) + # shifted by the steps required to create the first set of features + first_label_idx = -min(lags) + ocs + i + # make enough room for all the strided labels + last_label_idx = first_label_idx + len_y_single * stride + mask = slice(first_label_idx, last_label_idx, stride) expected_weights.append(weights_exact[mask]) - expected_weights = np.concatenate(expected_weights, axis=1)[:, ::-1] + expected_weights = np.concatenate(expected_weights, axis=1) if not single_series: expected_weights = np.concatenate([expected_weights] * 2, axis=0) np.testing.assert_array_almost_equal(weights[:, :, 0], expected_weights) diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index d29f3ac299..4785617c84 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -1155,8 +1155,7 @@ def _create_lagged_data_by_moving_window( # must take `(num_samples - 1)` values ahead of `first_window_end_idx` vals = vals[ first_window_start_idx : first_window_end_idx - + num_samples * stride - - 1, + + (num_samples - 1) * stride, :, :, ] From d6155520936fd6a7541fba1a3577377282013560 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 19 Dec 2024 15:51:31 +0100 Subject: [PATCH 07/18] fix: bug --- darts/utils/data/tabularization.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index 4785617c84..26f57bc433 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -1111,8 +1111,7 @@ def _create_lagged_data_by_moving_window( # after `first_window_end_idx`: vals = series_i.all_values(copy=False)[ first_window_start_idx : first_window_end_idx - + num_samples * stride - - 1, + + (num_samples - 1) * stride, :, :, ] From 9ff31a475c045e806930f41afd48a7cd9a18c5a9 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 19 Dec 2024 17:24:06 +0100 Subject: [PATCH 08/18] feat: adding test for stride in torch datasets --- darts/tests/datasets/test_datasets.py | 48 +++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/darts/tests/datasets/test_datasets.py b/darts/tests/datasets/test_datasets.py index 05a517a75c..70f7020696 100644 --- a/darts/tests/datasets/test_datasets.py +++ b/darts/tests/datasets/test_datasets.py @@ -1706,6 +1706,54 @@ def test_sequential_training_dataset_invalid_weight(self, ds_cls): "2000-01-02 00:00:00 - 2000-01-04 00:00:00." ) + @pytest.mark.parametrize( + "config", + [ + # (dataset class, whether contains future, future batch index) + (PastCovariatesSequentialDataset, None), + (FutureCovariatesSequentialDataset, 1), + (DualCovariatesSequentialDataset, 2), + (MixedCovariatesSequentialDataset, 3), + (SplitCovariatesSequentialDataset, 2), + ], + ) + def test_sequential_training_dataset_stride(self, config): + ds_cls, future_idx = config + icl = 4 + ocl = 2 + nb_samples = 12 + target = self.target1[: icl + ocl + nb_samples - 1] + + ds_covs = {} + ds_init_params = set(inspect.signature(ds_cls.__init__).parameters) + for cov_type in ["covariates", "past_covariates", "future_covariates"]: + if cov_type in ds_init_params: + ds_covs[cov_type] = self.cov1 + + ds_reg = ds_cls( + target_series=target, + input_chunk_length=icl, + output_chunk_length=ocl, + stride=1, + **ds_covs, + ) + + ds_stride = ds_cls( + target_series=target, + input_chunk_length=icl, + output_chunk_length=ocl, + stride=3, + **ds_covs, + ) + assert len(ds_stride) * 3 == len(ds_reg) == nb_samples + # regular dataset with stride=1, every 3rd values should be identical to the dataset with stride=3 + for idx, batch_str in enumerate(ds_stride): + for entry_s, entry_r in zip(batch_str, ds_reg[idx * 3]): + if entry_s is not None and entry_r is not None: + np.testing.assert_almost_equal(entry_s, entry_r) + else: + assert entry_s == entry_r + def test_get_matching_index(self): from darts.utils.data.utils import _get_matching_index From 69fbc474a2f89e041c973af92934e51256d99ca2 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 19 Dec 2024 17:29:07 +0100 Subject: [PATCH 09/18] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad43f4371f..a624102125 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ but cannot always guarantee backwards compatibility. Changes that may **break co - Added `data_transformers` argument to `historical_forecasts`, `backtest`, `residuals`, and `gridsearch` that allow to automatically apply `DataTransformer` and/or `Pipeline` to the input series without data-leakage (fit on historic window of input series, transform the input series, and inverse transform the forecasts). [#2529](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) and [Jan Fidor](https://github.com/JanFidor) - Added `series_idx` argument to `DataTransformer` that allows users to use only a subset of the transformers when `global_fit=False` and severals series are used. [#2529](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) - Updated the Documentation URL of `Statsforecast` models. [#2610](https://github.com/unit8co/darts/pull/2610) by [He Weilin](https://github.com/cnhwl). +- Added a `stride` argument to the `Dataset` classes (torch-based models) and the fitting methods of the `RegressionModels` to reduce the size of the training set or apply elaborate training approaches. [#2624](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) **Fixed** From 911b8c5993794fb4ab32c24bbfbc17a08bf30498 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Fri, 20 Dec 2024 09:15:24 +0100 Subject: [PATCH 10/18] fix: missing test and small bug --- .../test_create_lagged_training_data.py | 38 +++++++++++++++++++ darts/utils/data/tabularization.py | 8 ++-- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/darts/tests/utils/tabularization/test_create_lagged_training_data.py b/darts/tests/utils/tabularization/test_create_lagged_training_data.py index b1efa3ea0e..731b15c271 100644 --- a/darts/tests/utils/tabularization/test_create_lagged_training_data.py +++ b/darts/tests/utils/tabularization/test_create_lagged_training_data.py @@ -2092,6 +2092,44 @@ def test_lagged_training_data_invalid_output_chunk_length_error(self): ) assert "`output_chunk_length` must be a positive `int`." == str(err.value) + def test_lagged_training_data_invalid_stride_error(self): + """ + Tests that `create_lagged_training_data` throws correct error + when `stride` is set to a non-`int` value (e.g. a + `float`) or a non-positive value (e.g. `0`). + """ + target = linear_timeseries(start=1, length=20, freq=1) + lags = [-1] + ocl = 2 + # Check error thrown by 'moving windows' method and by 'time intersection' method: + for use_moving_windows in (False, True): + with pytest.raises(ValueError) as err: + create_lagged_training_data( + target_series=target, + output_chunk_length=ocl, + lags=lags, + uses_static_covariates=False, + use_moving_windows=use_moving_windows, + output_chunk_shift=0, + stride=-1, + ) + assert "`stride` must be a positive integer greater than 0." == str( + err.value + ) + with pytest.raises(ValueError) as err: + create_lagged_training_data( + target_series=target, + output_chunk_length=ocl, + lags=lags, + uses_static_covariates=False, + use_moving_windows=use_moving_windows, + output_chunk_shift=0, + stride=1.1, + ) + assert "`stride` must be a positive integer greater than 0." == str( + err.value + ) + def test_lagged_training_data_no_lags_specified_error(self): """ Tests that `create_lagged_training_data` throws correct error diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index 26f57bc433..5e029fcdc3 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -1022,9 +1022,9 @@ def _create_lagged_data_by_moving_window( raise_log( ValueError("Must specify at least one series-lags pair."), logger=logger ) - if not isinstance(stride, int) and stride < 1: + if not (isinstance(stride, int) and stride > 0): raise_log( - ValueError("`stride` must be a positive integer greater than 1."), + ValueError("`stride` must be a positive integer greater than 0."), logger=logger, ) sample_weight_vals = _extract_sample_weight(sample_weight, target_series) @@ -1270,9 +1270,9 @@ def _create_lagged_data_by_intersecting_times( raise_log( ValueError("Must specify at least one series-lags pair."), logger=logger ) - if not isinstance(stride, int) and stride < 1: + if not (isinstance(stride, int) and stride > 0): raise_log( - ValueError("`stride` must be a positive integer greater than 1."), + ValueError("`stride` must be a positive integer greater than 0."), logger=logger, ) sample_weight_vals = _extract_sample_weight(sample_weight, target_series) From e8ac91dd1d22841f07c3147ed4f48418b114533e Mon Sep 17 00:00:00 2001 From: madtoinou Date: Mon, 30 Dec 2024 12:15:32 +0100 Subject: [PATCH 11/18] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5d0e84be2..791623793e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ but cannot always guarantee backwards compatibility. Changes that may **break co **Improved** - New model: `StatsForecastAutoTBATS`. This model offers the [AutoTBATS](https://nixtlaverse.nixtla.io/statsforecast/src/core/models.html#autotbats) model from Nixtla's `statsforecasts` library. [#2611](https://github.com/unit8co/darts/pull/2611) by [He Weilin](https://github.com/cnhwl). +- Added a `stride` argument to the `Dataset` classes (torch-based models) and the fitting methods of the `RegressionModels` to reduce the size of the training set or apply elaborate training approaches. [#2624](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) **Fixed** @@ -48,7 +49,6 @@ but cannot always guarantee backwards compatibility. Changes that may **break co - Interval Non-Conformity Score for Quantile Regression `incs_qr()`, and Mean ... `mincs_qr()` (time-aggregated) ([source](https://arxiv.org/pdf/1905.03222)) - Added `series_idx` argument to `DataTransformer` that allows users to use only a subset of the transformers when `global_fit=False` and severals series are used. [#2529](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) - Updated the Documentation URL of `Statsforecast` models. [#2610](https://github.com/unit8co/darts/pull/2610) by [He Weilin](https://github.com/cnhwl). -- Added a `stride` argument to the `Dataset` classes (torch-based models) and the fitting methods of the `RegressionModels` to reduce the size of the training set or apply elaborate training approaches. [#2624](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) **Fixed** From 198edd5260fca257b6075c03cce83c063f5d3722 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Fri, 3 Jan 2025 11:43:19 +0100 Subject: [PATCH 12/18] doc: update some docstrings --- darts/models/forecasting/regression_model.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/darts/models/forecasting/regression_model.py b/darts/models/forecasting/regression_model.py index 547c5215b0..221484babd 100644 --- a/darts/models/forecasting/regression_model.py +++ b/darts/models/forecasting/regression_model.py @@ -780,7 +780,9 @@ def fit( computed globally based on the length of the longest series in `series`. Then for each series, the weights are extracted from the end of the global weights. This gives a common time weighting across all series. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. **kwargs Additional keyword arguments passed to the `fit` method of the model. """ @@ -1001,11 +1003,11 @@ def predict( If set to `True`, the model predicts the parameters of its `likelihood` instead of the target. Only supported for probabilistic models with a likelihood, `num_samples = 1` and `n<=output_chunk_length`. Default: ``False`` + show_warnings + Optionally, control whether warnings are shown. Not effective for all models. **kwargs : dict, optional Additional keyword arguments passed to the `predict` method of the model. Only works with univariate target series. - show_warnings - Optionally, control whether warnings are shown. Not effective for all models. """ if series is None: # then there must be a single TS, and that was saved in super().fit as self.training_series From a38b9f5685561d39fa029d117693979452c62ba9 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 9 Jan 2025 09:35:50 +0100 Subject: [PATCH 13/18] feat: stride is now applied from the end of the series --- darts/utils/data/tabularization.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index 5e029fcdc3..4f4a208a4d 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -222,7 +222,9 @@ def create_lagged_data( computed globally based on the length of the longest series in `series`. Then for each series, the weights are extracted from the end of the global weights. This gives a common time weighting across all series. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. show_warnings Whether to show warnings. @@ -504,7 +506,9 @@ def create_lagged_training_data( feature/label arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. sample_weight Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, per label (each step in `output_chunk_length`), and per component. @@ -652,7 +656,9 @@ def create_lagged_prediction_data( arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it will cause + gaps in the forecasts. show_warnings Whether to show warnings. @@ -1047,7 +1053,9 @@ def _create_lagged_data_by_moving_window( times = pd.date_range(start=time_bounds[0], end=time_bounds[1], freq=freq) if stride > 1: - times = times[::stride] + # calculate the starting index so that the last element is included after applying the stride + # equivalent to times[::-stride][::-1] + times = times[(len(times) - 1) % stride :: stride] num_samples = len(times) if num_samples > max_samples_per_ts: times = times[-max_samples_per_ts:] @@ -1106,9 +1114,9 @@ def _create_lagged_data_by_moving_window( first_window_start_idx = start_time_idx - max_lag_i first_window_end_idx = first_window_start_idx + window_len # Other windows are formed by sequentially shifting first window forward - # by stride position each time; to create `(num_samples - 1)` more windows - # in addition to the first window, need to take `(num_samples - 1)` values - # after `first_window_end_idx`: + # by `stride` position each time; to create `(num_samples - 1)` more windows + # in addition to the first window, need to take `(num_samples - 1) * stride` + # values after `first_window_end_idx`: vals = series_i.all_values(copy=False)[ first_window_start_idx : first_window_end_idx + (num_samples - 1) * stride, @@ -1151,7 +1159,8 @@ def _create_lagged_data_by_moving_window( continue # To create `(num_samples - 1)` other windows in addition to first window, - # must take `(num_samples - 1)` values ahead of `first_window_end_idx` + # must take `(num_samples - 1) * stride` values ahead of `first_window_end_idx` + # to also take the stride into consideration vals = vals[ first_window_start_idx : first_window_end_idx + (num_samples - 1) * stride, @@ -1285,7 +1294,9 @@ def _create_lagged_data_by_intersecting_times( logger=logger, ) if stride > 1: - shared_times = shared_times[::stride] + # calculate the starting index so that the last element is included after applying the stride + # equivalent to shared_times[::-stride][::-1] + shared_times = shared_times[(len(shared_times) - 1) % stride :: stride] if len(shared_times) > max_samples_per_ts: shared_times = shared_times[-max_samples_per_ts:] X = [] From c54699118cb0ef9855aa928d6db511bdf75df812 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 9 Jan 2025 09:36:16 +0100 Subject: [PATCH 14/18] feat: added stride support to the horizon based dataset --- darts/utils/data/horizon_based_dataset.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/darts/utils/data/horizon_based_dataset.py b/darts/utils/data/horizon_based_dataset.py index 962e910df0..5325da23f1 100644 --- a/darts/utils/data/horizon_based_dataset.py +++ b/darts/utils/data/horizon_based_dataset.py @@ -4,6 +4,7 @@ """ from collections.abc import Sequence +from math import ceil from typing import Optional, Union import numpy as np @@ -25,6 +26,7 @@ def __init__( output_chunk_length: int = 12, lh: tuple[int, int] = (1, 3), lookback: int = 3, + stride: int = 1, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, ) -> None: @@ -39,7 +41,7 @@ def __init__( Given the horizon `output_chunk_length` of a model, this dataset will compute some "past/future" splits as follows: - First a "forecast point" is selected in the the range of the last + First a "forecast point" is selected in the range of the last `(min_lh * output_chunk_length, max_lh * output_chunk_length)` points before the end of the time series. The "future" then consists in the following `output_chunk_length` points, and the "past" will be the preceding `lookback * output_chunk_length` points. @@ -72,6 +74,10 @@ def __init__( A integer interval for the length of the input in the emitted input and output splits, expressed as a multiple of `output_chunk_length`. For instance, `lookback=3` will emit "inputs" of lengths `3 * output_chunk_length`. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. use_static_covariates Whether to use/include static covariate data from input series. sample_weight @@ -104,8 +110,15 @@ def __init__( self.output_chunk_length = output_chunk_length self.min_lh, self.max_lh = lh self.lookback = lookback + self.stride = stride # Checks + if not (isinstance(stride, int) and stride > 0): + raise_log( + ValueError("`stride` must be a positive integer greater than 0."), + logger=logger, + ) + if not (self.max_lh >= self.min_lh >= 1): raise_log( ValueError( @@ -114,7 +127,9 @@ def __init__( ), logger=logger, ) - self.nr_samples_per_ts = (self.max_lh - self.min_lh) * self.output_chunk_length + self.nr_samples_per_ts = ceil( + ((self.max_lh - self.min_lh) * self.output_chunk_length) / self.stride + ) self.total_nr_samples = len(self.target_series) * self.nr_samples_per_ts self.use_static_covariates = use_static_covariates @@ -149,7 +164,7 @@ def __getitem__( # determine the index lh_idx of the forecasting point (the last point of the input series, before the target) # lh_idx should be in [0, self.nr_samples_per_ts) - lh_idx = idx - (target_idx * self.nr_samples_per_ts) + lh_idx = idx * self.stride - (target_idx * self.nr_samples_per_ts) # determine the index at the end of the output chunk end_of_output_idx = len(target_series) - ( From dd8ab500cccf3bb35037e6a17d1dee48c7ce07d2 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 9 Jan 2025 09:37:49 +0100 Subject: [PATCH 15/18] feat: updated unit tests --- darts/tests/datasets/test_datasets.py | 94 +++++++++++++++++---------- 1 file changed, 59 insertions(+), 35 deletions(-) diff --git a/darts/tests/datasets/test_datasets.py b/darts/tests/datasets/test_datasets.py index 70f7020696..616c5d66f7 100644 --- a/darts/tests/datasets/test_datasets.py +++ b/darts/tests/datasets/test_datasets.py @@ -69,6 +69,25 @@ def _assert_eq(self, lefts: tuple, rights: tuple): else: assert right is None + def _check_strided_ds(self, regular_ds, strided_ds, stride: int): + """ + Every `stride`-th values in a dataset with stride=1 should be identical to the dataset strided with `stride + """ + # if the un-strided length is a multiple of the stride + if len(regular_ds) % stride == 0: + assert len(regular_ds) == len(strided_ds) * stride + else: + assert len(regular_ds) > len(strided_ds) * stride + + for idx, batch_str in enumerate(strided_ds): + print("****", idx) + for entry_s, entry_r in zip(batch_str, regular_ds[idx * stride]): + if entry_s is not None and entry_r is not None: + np.testing.assert_almost_equal(entry_s, entry_r) + else: + assert entry_s == entry_r + print("****", idx, "passed") + def test_past_covariates_inference_dataset(self): # one target series ds = PastCovariatesInferenceDataset( @@ -491,7 +510,7 @@ def test_split_covariates_inference_dataset(self): @pytest.mark.parametrize( "config", [ - # (dataset class, whether contains future, future batch index) + # (dataset class, future batch index) (PastCovariatesInferenceDataset, None), (FutureCovariatesInferenceDataset, 1), (DualCovariatesInferenceDataset, 2), @@ -1322,36 +1341,40 @@ def test_dual_covariates_shifted_dataset(self): @pytest.mark.parametrize("use_weight", [False, True]) def test_horizon_based_dataset(self, use_weight): + ds_kwargs = { + "output_chunk_length": 10, + "lh": (1, 3), + "lookback": 2, + } weight1 = self.target1 + 1 weight2 = self.target2 + 1 - weight = weight1 if use_weight else None weight_exp = weight1[85:95] if use_weight else None # one target series - ds = HorizonBasedDataset( - target_series=self.target1, - output_chunk_length=10, - lh=(1, 3), - lookback=2, - sample_weight=weight, - ) + ds_kwargs["target_series"] = self.target1 + ds_kwargs["sample_weight"] = weight1 if use_weight else None + ds = HorizonBasedDataset(**ds_kwargs) assert len(ds) == 20 self._assert_eq( ds[5], (self.target1[65:85], None, self.cov_st1, weight_exp, self.target1[85:95]), ) + # one target series, with stride + self._check_strided_ds( + regular_ds=ds, + strided_ds=HorizonBasedDataset( + **ds_kwargs, + stride=3, + ), + stride=3, + ) # two target series - weight = [weight1, weight2] if use_weight else None weight_exp1 = weight1[85:95] if use_weight else None weight_exp2 = weight2[135:145] if use_weight else None - ds = HorizonBasedDataset( - target_series=[self.target1, self.target2], - output_chunk_length=10, - lh=(1, 3), - lookback=2, - sample_weight=weight, - ) + ds_kwargs["target_series"] = [self.target1, self.target2] + ds_kwargs["sample_weight"] = [weight1, weight2] if use_weight else None + ds = HorizonBasedDataset(**ds_kwargs) assert len(ds) == 40 self._assert_eq( ds[5], @@ -1367,6 +1390,12 @@ def test_horizon_based_dataset(self, use_weight): self.target2[135:145], ), ) + # two target series, with stride + self._check_strided_ds( + regular_ds=ds, + strided_ds=HorizonBasedDataset(**ds_kwargs, stride=3), + stride=3, + ) # two targets and one covariate with pytest.raises(ValueError): @@ -1375,17 +1404,12 @@ def test_horizon_based_dataset(self, use_weight): ) # two targets and two covariates - weight = [weight1, weight2] if use_weight else None weight_exp1 = weight1[85:95] if use_weight else None weight_exp2 = weight2[135:145] if use_weight else None - ds = HorizonBasedDataset( - target_series=[self.target1, self.target2], - covariates=[self.cov1, self.cov2], - output_chunk_length=10, - lh=(1, 3), - lookback=2, - sample_weight=weight, - ) + ds_kwargs["target_series"] = [self.target1, self.target2] + ds_kwargs["covariates"] = [self.cov1, self.cov2] + ds_kwargs["sample_weight"] = [weight1, weight2] if use_weight else None + ds = HorizonBasedDataset(**ds_kwargs) self._assert_eq( ds[5], ( @@ -1406,11 +1430,17 @@ def test_horizon_based_dataset(self, use_weight): self.target2[135:145], ), ) + # two targets and two covariates, with stride + self._check_strided_ds( + regular_ds=ds, + strided_ds=HorizonBasedDataset(**ds_kwargs, stride=3), + stride=3, + ) @pytest.mark.parametrize( "config", [ - # (dataset class, whether contains future, future batch index) + # (dataset class, future batch index) (PastCovariatesSequentialDataset, None), (FutureCovariatesSequentialDataset, 1), (DualCovariatesSequentialDataset, 2), @@ -1709,7 +1739,7 @@ def test_sequential_training_dataset_invalid_weight(self, ds_cls): @pytest.mark.parametrize( "config", [ - # (dataset class, whether contains future, future batch index) + # (dataset class, future batch index) (PastCovariatesSequentialDataset, None), (FutureCovariatesSequentialDataset, 1), (DualCovariatesSequentialDataset, 2), @@ -1746,13 +1776,7 @@ def test_sequential_training_dataset_stride(self, config): **ds_covs, ) assert len(ds_stride) * 3 == len(ds_reg) == nb_samples - # regular dataset with stride=1, every 3rd values should be identical to the dataset with stride=3 - for idx, batch_str in enumerate(ds_stride): - for entry_s, entry_r in zip(batch_str, ds_reg[idx * 3]): - if entry_s is not None and entry_r is not None: - np.testing.assert_almost_equal(entry_s, entry_r) - else: - assert entry_s == entry_r + self._check_strided_ds(regular_ds=ds_reg, strided_ds=ds_stride, stride=3) def test_get_matching_index(self): from darts.utils.data.utils import _get_matching_index From 2fc77e0034514ee3e86be9d2588951c2094a1fbd Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 9 Jan 2025 09:38:34 +0100 Subject: [PATCH 16/18] feat: adding support of stride in the fit method of torch models --- .../forecasting/global_baseline_models.py | 2 + darts/models/forecasting/rnn_model.py | 2 + darts/models/forecasting/tcn_model.py | 2 + darts/models/forecasting/tft_model.py | 2 + .../forecasting/torch_forecasting_model.py | 16 ++++++- darts/utils/data/shifted_dataset.py | 42 ++++++++++++++----- 6 files changed, 54 insertions(+), 12 deletions(-) diff --git a/darts/models/forecasting/global_baseline_models.py b/darts/models/forecasting/global_baseline_models.py index 06c9f30618..d6f9bdf522 100644 --- a/darts/models/forecasting/global_baseline_models.py +++ b/darts/models/forecasting/global_baseline_models.py @@ -256,6 +256,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int, ) -> MixedCovariatesTrainingDataset: return MixedCovariatesSequentialDataset( target_series=target, @@ -264,6 +265,7 @@ def _build_train_dataset( input_chunk_length=self.input_chunk_length, output_chunk_length=0, output_chunk_shift=self.output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/rnn_model.py b/darts/models/forecasting/rnn_model.py index e7d55ac9c6..d9a948e595 100644 --- a/darts/models/forecasting/rnn_model.py +++ b/darts/models/forecasting/rnn_model.py @@ -566,12 +566,14 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int, ) -> DualCovariatesShiftedDataset: return DualCovariatesShiftedDataset( target_series=target, covariates=future_covariates, length=self.training_length, shift=1, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/tcn_model.py b/darts/models/forecasting/tcn_model.py index 3d66b4613d..112b953553 100644 --- a/darts/models/forecasting/tcn_model.py +++ b/darts/models/forecasting/tcn_model.py @@ -538,12 +538,14 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int, ) -> PastCovariatesShiftedDataset: return PastCovariatesShiftedDataset( target_series=target, covariates=past_covariates, length=self.input_chunk_length, shift=self.output_chunk_length + self.output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/tft_model.py b/darts/models/forecasting/tft_model.py index 2f53e12af5..a2579410c1 100644 --- a/darts/models/forecasting/tft_model.py +++ b/darts/models/forecasting/tft_model.py @@ -1162,6 +1162,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int, ) -> MixedCovariatesSequentialDataset: raise_if( future_covariates is None and not self.add_relative_index, @@ -1179,6 +1180,7 @@ def _build_train_dataset( input_chunk_length=self.input_chunk_length, output_chunk_length=self.output_chunk_length, output_chunk_shift=self.output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/torch_forecasting_model.py b/darts/models/forecasting/torch_forecasting_model.py index 89ca19401f..ab7f1815ac 100644 --- a/darts/models/forecasting/torch_forecasting_model.py +++ b/darts/models/forecasting/torch_forecasting_model.py @@ -567,6 +567,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int, ) -> TrainingDataset: """ Each model must specify the default training dataset to use. @@ -659,6 +660,8 @@ def fit( val_sample_weight: Optional[ Union[TimeSeries, Sequence[TimeSeries], str] ] = None, + stride: int = 1, + eval_stride: int = 1, ) -> "TorchForecastingModel": """Fit/train the model on one or multiple series. @@ -732,7 +735,12 @@ def fit( are extracted from the end of the global weights. This gives a common time weighting across all series. val_sample_weight Same as for `sample_weight` but for the evaluation dataset. - + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. + eval_stride + Same as for `stride` but for the evaluation dataset. Returns ------- self @@ -750,10 +758,12 @@ def fit( past_covariates=past_covariates, future_covariates=future_covariates, sample_weight=sample_weight, + stride=stride, val_series=val_series, val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, val_sample_weight=val_sample_weight, + eval_stride=eval_stride, trainer=trainer, verbose=verbose, epochs=epochs, @@ -774,12 +784,14 @@ def _setup_for_fit_from_dataset( past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, + stride: int = 1, val_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_sample_weight: Optional[ Union[TimeSeries, Sequence[TimeSeries], str] ] = None, + eval_stride: int = 1, trainer: Optional[pl.Trainer] = None, verbose: Optional[bool] = None, epochs: int = 0, @@ -855,6 +867,7 @@ def _setup_for_fit_from_dataset( future_covariates=future_covariates, sample_weight=sample_weight, max_samples_per_ts=max_samples_per_ts, + stride=stride, ) if val_series is not None: @@ -864,6 +877,7 @@ def _setup_for_fit_from_dataset( future_covariates=val_future_covariates, sample_weight=val_sample_weight, max_samples_per_ts=max_samples_per_ts, + stride=eval_stride, ) else: val_dataset = None diff --git a/darts/utils/data/shifted_dataset.py b/darts/utils/data/shifted_dataset.py index c1f2160516..9ac233834e 100644 --- a/darts/utils/data/shifted_dataset.py +++ b/darts/utils/data/shifted_dataset.py @@ -4,6 +4,7 @@ """ from collections.abc import Sequence +from math import ceil from typing import Optional, Union import numpy as np @@ -164,7 +165,9 @@ def __init__( shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -264,7 +267,9 @@ def __init__( shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -392,7 +397,9 @@ def __init__( shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -518,7 +525,9 @@ def __init__( shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -638,7 +647,9 @@ def __init__( Whether to shift the covariates forward the same way as the target. FutureCovariatesModel's require this set to True, while PastCovariatesModel's require this set to False. stride - The number of time steps between consecutive entries. + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -663,6 +674,12 @@ def __init__( """ super().__init__() + if not (isinstance(stride, int) and stride > 0): + raise_log( + ValueError("`stride` must be a positive integer greater than 0."), + logger=logger, + ) + # setup target and sequence self.target_series = series2seq(target_series) self.input_chunk_length = input_chunk_length @@ -719,9 +736,12 @@ def __init__( # setup samples if self.max_samples_per_ts is None: # read all time series to get the maximum size - self.max_samples_per_ts = ( - max(len(ts) for ts in self.target_series) - self.size_of_both_chunks - ) // self.stride + 1 + max_samples_per_ts = ( + max(len(ts) for ts in self.target_series) - self.size_of_both_chunks + 1 + ) + + # adjust by `stride` + self.max_samples_per_ts = ceil(max_samples_per_ts / self.stride) self.ideal_nr_samples = len(self.target_series) * self.max_samples_per_ts def __len__(self): @@ -742,9 +762,9 @@ def __getitem__( target_vals = target_series.random_component_values(copy=False) # determine the actual number of possible samples in this time series - n_samples_in_ts = ( - len(target_vals) - self.size_of_both_chunks - ) // self.stride + 1 + n_samples_in_ts = ceil( + (len(target_vals) - self.size_of_both_chunks + 1) / self.stride + ) if n_samples_in_ts < 1: raise_log( From 1a7e42d70c5e50cbe6af6e1157784fdaffb24204 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 9 Jan 2025 09:40:17 +0100 Subject: [PATCH 17/18] fix: update the test so that the stride is applied from the end --- .../utils/tabularization/test_create_lagged_training_data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/darts/tests/utils/tabularization/test_create_lagged_training_data.py b/darts/tests/utils/tabularization/test_create_lagged_training_data.py index 731b15c271..6e4d0f8506 100644 --- a/darts/tests/utils/tabularization/test_create_lagged_training_data.py +++ b/darts/tests/utils/tabularization/test_create_lagged_training_data.py @@ -104,7 +104,8 @@ def get_feature_times( times = times.intersection(future_times) # Apply stride if stride > 1: - times = times[::stride] + # apply the stride from the end and then reorder the samples + times = times[::-stride][::-1] # Take most recent `max_samples_per_ts` samples if requested: if (max_samples_per_ts is not None) and (len(times) > max_samples_per_ts): times = times[-max_samples_per_ts:] From f168f46e5c2fdd3ee4a34ea75f04b3f01ed91005 Mon Sep 17 00:00:00 2001 From: madtoinou Date: Thu, 9 Jan 2025 15:09:19 +0100 Subject: [PATCH 18/18] fix: add missing defualt value --- darts/models/forecasting/global_baseline_models.py | 2 +- darts/models/forecasting/rnn_model.py | 2 +- darts/models/forecasting/tcn_model.py | 2 +- darts/models/forecasting/tft_model.py | 2 +- darts/models/forecasting/torch_forecasting_model.py | 12 +++++++++++- darts/tests/datasets/test_datasets.py | 4 ---- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/darts/models/forecasting/global_baseline_models.py b/darts/models/forecasting/global_baseline_models.py index d6f9bdf522..da6f1a684c 100644 --- a/darts/models/forecasting/global_baseline_models.py +++ b/darts/models/forecasting/global_baseline_models.py @@ -256,7 +256,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], - stride: int, + stride: int = 1, ) -> MixedCovariatesTrainingDataset: return MixedCovariatesSequentialDataset( target_series=target, diff --git a/darts/models/forecasting/rnn_model.py b/darts/models/forecasting/rnn_model.py index d9a948e595..3193f302c6 100644 --- a/darts/models/forecasting/rnn_model.py +++ b/darts/models/forecasting/rnn_model.py @@ -566,7 +566,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], - stride: int, + stride: int = 1, ) -> DualCovariatesShiftedDataset: return DualCovariatesShiftedDataset( target_series=target, diff --git a/darts/models/forecasting/tcn_model.py b/darts/models/forecasting/tcn_model.py index 112b953553..47a3bffe37 100644 --- a/darts/models/forecasting/tcn_model.py +++ b/darts/models/forecasting/tcn_model.py @@ -538,7 +538,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], - stride: int, + stride: int = 1, ) -> PastCovariatesShiftedDataset: return PastCovariatesShiftedDataset( target_series=target, diff --git a/darts/models/forecasting/tft_model.py b/darts/models/forecasting/tft_model.py index a2579410c1..45943ced02 100644 --- a/darts/models/forecasting/tft_model.py +++ b/darts/models/forecasting/tft_model.py @@ -1162,7 +1162,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], - stride: int, + stride: int = 1, ) -> MixedCovariatesSequentialDataset: raise_if( future_covariates is None and not self.add_relative_index, diff --git a/darts/models/forecasting/torch_forecasting_model.py b/darts/models/forecasting/torch_forecasting_model.py index ab7f1815ac..9328cee4e6 100644 --- a/darts/models/forecasting/torch_forecasting_model.py +++ b/darts/models/forecasting/torch_forecasting_model.py @@ -567,7 +567,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], - stride: int, + stride: int = 1, ) -> TrainingDataset: """ Each model must specify the default training dataset to use. @@ -2500,6 +2500,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> PastCovariatesTrainingDataset: return PastCovariatesSequentialDataset( target_series=target, @@ -2510,6 +2511,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2594,6 +2596,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> FutureCovariatesTrainingDataset: return FutureCovariatesSequentialDataset( target_series=target, @@ -2604,6 +2607,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2687,6 +2691,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> DualCovariatesTrainingDataset: return DualCovariatesSequentialDataset( target_series=target, @@ -2697,6 +2702,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2779,6 +2785,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> MixedCovariatesTrainingDataset: return MixedCovariatesSequentialDataset( target_series=target, @@ -2790,6 +2797,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2873,6 +2881,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> SplitCovariatesTrainingDataset: return SplitCovariatesSequentialDataset( target_series=target, @@ -2884,6 +2893,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( diff --git a/darts/tests/datasets/test_datasets.py b/darts/tests/datasets/test_datasets.py index 616c5d66f7..b18c41cf4c 100644 --- a/darts/tests/datasets/test_datasets.py +++ b/darts/tests/datasets/test_datasets.py @@ -76,17 +76,13 @@ def _check_strided_ds(self, regular_ds, strided_ds, stride: int): # if the un-strided length is a multiple of the stride if len(regular_ds) % stride == 0: assert len(regular_ds) == len(strided_ds) * stride - else: - assert len(regular_ds) > len(strided_ds) * stride for idx, batch_str in enumerate(strided_ds): - print("****", idx) for entry_s, entry_r in zip(batch_str, regular_ds[idx * stride]): if entry_s is not None and entry_r is not None: np.testing.assert_almost_equal(entry_s, entry_r) else: assert entry_s == entry_r - print("****", idx, "passed") def test_past_covariates_inference_dataset(self): # one target series