diff --git a/CHANGELOG.md b/CHANGELOG.md index 5febc3e7a1..c8531b0eb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ but cannot always guarantee backwards compatibility. Changes that may **break co - Added more resampling methods to `TimeSeries.resample()`. This allows to aggregate values when down-sampling and to fill or keep the holes when up-sampling. [#2654](https://github.com/unit8co/darts/pull/2654) by [Jonas Blanc](https://github.com/jonasblanc) - Added general function `darts.slice_intersect()` to intersect a sequence of `TimeSeries` along the time index. [#2592](https://github.com/unit8co/darts/pull/2592) by [Yoav Matzkevich](https://github.com/ymatzkevich). - Added new time aggregated metric `wmape()` (Weighted Mean Absolute Percentage Error). [#2544](https://github.com/unit8co/darts/pull/2648) by [He Weilin](https://github.com/cnhwl). +- Added a `stride` argument to the `Dataset` classes and the `fit()/predict()` methods of the `RegressionModels` and torch-based models to reduce the size of the training set or apply elaborate training approaches. [#2624](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) **Fixed** - Fixed a bug when performing optimized historical forecasts with `stride=1` using a `RegressionModel` with `output_chunk_shift>=1` and `output_chunk_length=1`, where the forecast time index was not properly shifted. [#2634](https://github.com/unit8co/darts/pull/2634) by [Mattias De Charleroy](https://github.com/MattiasDC). diff --git a/darts/models/forecasting/global_baseline_models.py b/darts/models/forecasting/global_baseline_models.py index 06c9f30618..da6f1a684c 100644 --- a/darts/models/forecasting/global_baseline_models.py +++ b/darts/models/forecasting/global_baseline_models.py @@ -256,6 +256,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> MixedCovariatesTrainingDataset: return MixedCovariatesSequentialDataset( target_series=target, @@ -264,6 +265,7 @@ def _build_train_dataset( input_chunk_length=self.input_chunk_length, output_chunk_length=0, output_chunk_shift=self.output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/regression_model.py b/darts/models/forecasting/regression_model.py index b5c76a0f0e..221484babd 100644 --- a/darts/models/forecasting/regression_model.py +++ b/darts/models/forecasting/regression_model.py @@ -600,6 +600,7 @@ def _create_lagged_data( future_covariates: Sequence[TimeSeries], max_samples_per_ts: int, sample_weight: Optional[Union[TimeSeries, str]] = None, + stride: int = 1, last_static_covariates_shape: Optional[tuple[int, int]] = None, ): ( @@ -624,6 +625,7 @@ def _create_lagged_data( check_inputs=False, concatenate=False, sample_weight=sample_weight, + stride=stride, ) expected_nb_feat = ( @@ -675,6 +677,7 @@ def _fit_model( future_covariates: Sequence[TimeSeries], max_samples_per_ts: int, sample_weight: Optional[Union[Sequence[TimeSeries], str]], + stride: int, val_series: Optional[Sequence[TimeSeries]] = None, val_past_covariates: Optional[Sequence[TimeSeries]] = None, val_future_covariates: Optional[Sequence[TimeSeries]] = None, @@ -692,6 +695,7 @@ def _fit_model( max_samples_per_ts=max_samples_per_ts, sample_weight=sample_weight, last_static_covariates_shape=None, + stride=stride, ) if self.supports_val_set and val_series is not None: @@ -741,6 +745,7 @@ def fit( max_samples_per_ts: Optional[int] = None, n_jobs_multioutput_wrapper: Optional[int] = None, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, + stride: int = 1, **kwargs, ): """ @@ -774,6 +779,10 @@ def fit( `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are computed globally based on the length of the longest series in `series`. Then for each series, the weights are extracted from the end of the global weights. This gives a common time weighting across all series. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. **kwargs Additional keyword arguments passed to the `fit` method of the model. """ @@ -952,6 +961,7 @@ def fit( sample_weight=sample_weight, val_sample_weight=val_sample_weight, max_samples_per_ts=max_samples_per_ts, + stride=stride, **kwargs, ) return self @@ -993,11 +1003,11 @@ def predict( If set to `True`, the model predicts the parameters of its `likelihood` instead of the target. Only supported for probabilistic models with a likelihood, `num_samples = 1` and `n<=output_chunk_length`. Default: ``False`` + show_warnings + Optionally, control whether warnings are shown. Not effective for all models. **kwargs : dict, optional Additional keyword arguments passed to the `predict` method of the model. Only works with univariate target series. - show_warnings - Optionally, control whether warnings are shown. Not effective for all models. """ if series is None: # then there must be a single TS, and that was saved in super().fit as self.training_series diff --git a/darts/models/forecasting/rnn_model.py b/darts/models/forecasting/rnn_model.py index e7d55ac9c6..3193f302c6 100644 --- a/darts/models/forecasting/rnn_model.py +++ b/darts/models/forecasting/rnn_model.py @@ -566,12 +566,14 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> DualCovariatesShiftedDataset: return DualCovariatesShiftedDataset( target_series=target, covariates=future_covariates, length=self.training_length, shift=1, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/tcn_model.py b/darts/models/forecasting/tcn_model.py index 3d66b4613d..47a3bffe37 100644 --- a/darts/models/forecasting/tcn_model.py +++ b/darts/models/forecasting/tcn_model.py @@ -538,12 +538,14 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> PastCovariatesShiftedDataset: return PastCovariatesShiftedDataset( target_series=target, covariates=past_covariates, length=self.input_chunk_length, shift=self.output_chunk_length + self.output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/tft_model.py b/darts/models/forecasting/tft_model.py index 2f53e12af5..45943ced02 100644 --- a/darts/models/forecasting/tft_model.py +++ b/darts/models/forecasting/tft_model.py @@ -1162,6 +1162,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Sequence[TimeSeries]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> MixedCovariatesSequentialDataset: raise_if( future_covariates is None and not self.add_relative_index, @@ -1179,6 +1180,7 @@ def _build_train_dataset( input_chunk_length=self.input_chunk_length, output_chunk_length=self.output_chunk_length, output_chunk_shift=self.output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, diff --git a/darts/models/forecasting/torch_forecasting_model.py b/darts/models/forecasting/torch_forecasting_model.py index 89ca19401f..9328cee4e6 100644 --- a/darts/models/forecasting/torch_forecasting_model.py +++ b/darts/models/forecasting/torch_forecasting_model.py @@ -567,6 +567,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> TrainingDataset: """ Each model must specify the default training dataset to use. @@ -659,6 +660,8 @@ def fit( val_sample_weight: Optional[ Union[TimeSeries, Sequence[TimeSeries], str] ] = None, + stride: int = 1, + eval_stride: int = 1, ) -> "TorchForecastingModel": """Fit/train the model on one or multiple series. @@ -732,7 +735,12 @@ def fit( are extracted from the end of the global weights. This gives a common time weighting across all series. val_sample_weight Same as for `sample_weight` but for the evaluation dataset. - + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. + eval_stride + Same as for `stride` but for the evaluation dataset. Returns ------- self @@ -750,10 +758,12 @@ def fit( past_covariates=past_covariates, future_covariates=future_covariates, sample_weight=sample_weight, + stride=stride, val_series=val_series, val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, val_sample_weight=val_sample_weight, + eval_stride=eval_stride, trainer=trainer, verbose=verbose, epochs=epochs, @@ -774,12 +784,14 @@ def _setup_for_fit_from_dataset( past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, + stride: int = 1, val_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_sample_weight: Optional[ Union[TimeSeries, Sequence[TimeSeries], str] ] = None, + eval_stride: int = 1, trainer: Optional[pl.Trainer] = None, verbose: Optional[bool] = None, epochs: int = 0, @@ -855,6 +867,7 @@ def _setup_for_fit_from_dataset( future_covariates=future_covariates, sample_weight=sample_weight, max_samples_per_ts=max_samples_per_ts, + stride=stride, ) if val_series is not None: @@ -864,6 +877,7 @@ def _setup_for_fit_from_dataset( future_covariates=val_future_covariates, sample_weight=val_sample_weight, max_samples_per_ts=max_samples_per_ts, + stride=eval_stride, ) else: val_dataset = None @@ -2486,6 +2500,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> PastCovariatesTrainingDataset: return PastCovariatesSequentialDataset( target_series=target, @@ -2496,6 +2511,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2580,6 +2596,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> FutureCovariatesTrainingDataset: return FutureCovariatesSequentialDataset( target_series=target, @@ -2590,6 +2607,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2673,6 +2691,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> DualCovariatesTrainingDataset: return DualCovariatesSequentialDataset( target_series=target, @@ -2683,6 +2702,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2765,6 +2785,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> MixedCovariatesTrainingDataset: return MixedCovariatesSequentialDataset( target_series=target, @@ -2776,6 +2797,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( @@ -2859,6 +2881,7 @@ def _build_train_dataset( future_covariates: Optional[Sequence[TimeSeries]], sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: Optional[int], + stride: int = 1, ) -> SplitCovariatesTrainingDataset: return SplitCovariatesSequentialDataset( target_series=target, @@ -2870,6 +2893,7 @@ def _build_train_dataset( max_samples_per_ts=max_samples_per_ts, use_static_covariates=self.uses_static_covariates, sample_weight=sample_weight, + stride=stride, ) def _build_inference_dataset( diff --git a/darts/tests/datasets/test_datasets.py b/darts/tests/datasets/test_datasets.py index 05a517a75c..b18c41cf4c 100644 --- a/darts/tests/datasets/test_datasets.py +++ b/darts/tests/datasets/test_datasets.py @@ -69,6 +69,21 @@ def _assert_eq(self, lefts: tuple, rights: tuple): else: assert right is None + def _check_strided_ds(self, regular_ds, strided_ds, stride: int): + """ + Every `stride`-th values in a dataset with stride=1 should be identical to the dataset strided with `stride + """ + # if the un-strided length is a multiple of the stride + if len(regular_ds) % stride == 0: + assert len(regular_ds) == len(strided_ds) * stride + + for idx, batch_str in enumerate(strided_ds): + for entry_s, entry_r in zip(batch_str, regular_ds[idx * stride]): + if entry_s is not None and entry_r is not None: + np.testing.assert_almost_equal(entry_s, entry_r) + else: + assert entry_s == entry_r + def test_past_covariates_inference_dataset(self): # one target series ds = PastCovariatesInferenceDataset( @@ -491,7 +506,7 @@ def test_split_covariates_inference_dataset(self): @pytest.mark.parametrize( "config", [ - # (dataset class, whether contains future, future batch index) + # (dataset class, future batch index) (PastCovariatesInferenceDataset, None), (FutureCovariatesInferenceDataset, 1), (DualCovariatesInferenceDataset, 2), @@ -1322,36 +1337,40 @@ def test_dual_covariates_shifted_dataset(self): @pytest.mark.parametrize("use_weight", [False, True]) def test_horizon_based_dataset(self, use_weight): + ds_kwargs = { + "output_chunk_length": 10, + "lh": (1, 3), + "lookback": 2, + } weight1 = self.target1 + 1 weight2 = self.target2 + 1 - weight = weight1 if use_weight else None weight_exp = weight1[85:95] if use_weight else None # one target series - ds = HorizonBasedDataset( - target_series=self.target1, - output_chunk_length=10, - lh=(1, 3), - lookback=2, - sample_weight=weight, - ) + ds_kwargs["target_series"] = self.target1 + ds_kwargs["sample_weight"] = weight1 if use_weight else None + ds = HorizonBasedDataset(**ds_kwargs) assert len(ds) == 20 self._assert_eq( ds[5], (self.target1[65:85], None, self.cov_st1, weight_exp, self.target1[85:95]), ) + # one target series, with stride + self._check_strided_ds( + regular_ds=ds, + strided_ds=HorizonBasedDataset( + **ds_kwargs, + stride=3, + ), + stride=3, + ) # two target series - weight = [weight1, weight2] if use_weight else None weight_exp1 = weight1[85:95] if use_weight else None weight_exp2 = weight2[135:145] if use_weight else None - ds = HorizonBasedDataset( - target_series=[self.target1, self.target2], - output_chunk_length=10, - lh=(1, 3), - lookback=2, - sample_weight=weight, - ) + ds_kwargs["target_series"] = [self.target1, self.target2] + ds_kwargs["sample_weight"] = [weight1, weight2] if use_weight else None + ds = HorizonBasedDataset(**ds_kwargs) assert len(ds) == 40 self._assert_eq( ds[5], @@ -1367,6 +1386,12 @@ def test_horizon_based_dataset(self, use_weight): self.target2[135:145], ), ) + # two target series, with stride + self._check_strided_ds( + regular_ds=ds, + strided_ds=HorizonBasedDataset(**ds_kwargs, stride=3), + stride=3, + ) # two targets and one covariate with pytest.raises(ValueError): @@ -1375,17 +1400,12 @@ def test_horizon_based_dataset(self, use_weight): ) # two targets and two covariates - weight = [weight1, weight2] if use_weight else None weight_exp1 = weight1[85:95] if use_weight else None weight_exp2 = weight2[135:145] if use_weight else None - ds = HorizonBasedDataset( - target_series=[self.target1, self.target2], - covariates=[self.cov1, self.cov2], - output_chunk_length=10, - lh=(1, 3), - lookback=2, - sample_weight=weight, - ) + ds_kwargs["target_series"] = [self.target1, self.target2] + ds_kwargs["covariates"] = [self.cov1, self.cov2] + ds_kwargs["sample_weight"] = [weight1, weight2] if use_weight else None + ds = HorizonBasedDataset(**ds_kwargs) self._assert_eq( ds[5], ( @@ -1406,11 +1426,17 @@ def test_horizon_based_dataset(self, use_weight): self.target2[135:145], ), ) + # two targets and two covariates, with stride + self._check_strided_ds( + regular_ds=ds, + strided_ds=HorizonBasedDataset(**ds_kwargs, stride=3), + stride=3, + ) @pytest.mark.parametrize( "config", [ - # (dataset class, whether contains future, future batch index) + # (dataset class, future batch index) (PastCovariatesSequentialDataset, None), (FutureCovariatesSequentialDataset, 1), (DualCovariatesSequentialDataset, 2), @@ -1706,6 +1732,48 @@ def test_sequential_training_dataset_invalid_weight(self, ds_cls): "2000-01-02 00:00:00 - 2000-01-04 00:00:00." ) + @pytest.mark.parametrize( + "config", + [ + # (dataset class, future batch index) + (PastCovariatesSequentialDataset, None), + (FutureCovariatesSequentialDataset, 1), + (DualCovariatesSequentialDataset, 2), + (MixedCovariatesSequentialDataset, 3), + (SplitCovariatesSequentialDataset, 2), + ], + ) + def test_sequential_training_dataset_stride(self, config): + ds_cls, future_idx = config + icl = 4 + ocl = 2 + nb_samples = 12 + target = self.target1[: icl + ocl + nb_samples - 1] + + ds_covs = {} + ds_init_params = set(inspect.signature(ds_cls.__init__).parameters) + for cov_type in ["covariates", "past_covariates", "future_covariates"]: + if cov_type in ds_init_params: + ds_covs[cov_type] = self.cov1 + + ds_reg = ds_cls( + target_series=target, + input_chunk_length=icl, + output_chunk_length=ocl, + stride=1, + **ds_covs, + ) + + ds_stride = ds_cls( + target_series=target, + input_chunk_length=icl, + output_chunk_length=ocl, + stride=3, + **ds_covs, + ) + assert len(ds_stride) * 3 == len(ds_reg) == nb_samples + self._check_strided_ds(regular_ds=ds_reg, strided_ds=ds_stride, stride=3) + def test_get_matching_index(self): from darts.utils.data.utils import _get_matching_index diff --git a/darts/tests/utils/tabularization/test_create_lagged_training_data.py b/darts/tests/utils/tabularization/test_create_lagged_training_data.py index cd4f32f1e9..6e4d0f8506 100644 --- a/darts/tests/utils/tabularization/test_create_lagged_training_data.py +++ b/darts/tests/utils/tabularization/test_create_lagged_training_data.py @@ -72,7 +72,8 @@ def get_feature_times( output_chunk_length: Optional[int], max_samples_per_ts: Optional[int], output_chunk_shift: int, - ): + stride: int, + ) -> pd.Index: """ Helper function that returns the times shared by all specified series that can be used to create features and labels. This is performed by using the helper functions @@ -101,6 +102,10 @@ def get_feature_times( future, lags_future ) times = times.intersection(future_times) + # Apply stride + if stride > 1: + # apply the stride from the end and then reorder the samples + times = times[::-stride][::-1] # Take most recent `max_samples_per_ts` samples if requested: if (max_samples_per_ts is not None) and (len(times) > max_samples_per_ts): times = times[-max_samples_per_ts:] @@ -433,6 +438,7 @@ def helper_create_expected_lagged_data( output_chunk_shift: int, multi_models: bool, max_samples_per_ts: Optional[int], + stride: int, ) -> tuple[np.ndarray, np.ndarray, Any]: """Helper function to create the X and y arrays by building them block by block (one per covariates).""" feats_times = self.get_feature_times( @@ -445,6 +451,7 @@ def helper_create_expected_lagged_data( output_chunk_length, max_samples_per_ts, output_chunk_shift, + stride, ) # Construct `X` by constructing each block, then concatenate these # blocks together along component axis: @@ -487,6 +494,7 @@ def helper_check_lagged_data( max_samples_per_ts: Optional[int], use_moving_windows: bool, concatenate: bool, + stride: int, **kwargs, ): """Helper function to call the `create_lagged_training_data()` method with lags argument either in the list @@ -537,6 +545,7 @@ def helper_check_lagged_data( use_moving_windows=use_moving_windows, output_chunk_shift=output_chunk_shift, concatenate=concatenate, + stride=stride, ) # should have the exact same number of indexes assert len(times) == len(expected_times_x) == len(expected_times_y) @@ -642,10 +651,13 @@ def helper_check_lagged_data( min_n_ts = 8 + max(output_chunk_shift_combos) @pytest.mark.parametrize( - "series_type", - ["datetime", "integer"], + "params", + product( + ["datetime", "integer"], # series_type + [1, 3], # stride + ), ) - def test_lagged_training_data_equal_freq(self, series_type: str): + def test_lagged_training_data_equal_freq(self, params): """ Tests that `create_lagged_training_data` produces `X`, `y`, and `times` outputs that are consistent with those generated by using the helper @@ -659,6 +671,7 @@ def test_lagged_training_data_equal_freq(self, series_type: str): are of the same frequency, the implementation of the 'moving window' method is being tested here. """ + series_type, stride = params # Define datetime index timeseries - each has different number of components, # different start times, different lengths, and different values, but # they're all of the same frequency: @@ -749,6 +762,7 @@ def test_lagged_training_data_equal_freq(self, series_type: str): output_chunk_shift, multi_models, max_samples_per_ts, + stride, ) ) @@ -770,6 +784,7 @@ def test_lagged_training_data_equal_freq(self, series_type: str): "max_samples_per_ts": max_samples_per_ts, "use_moving_windows": True, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -777,10 +792,13 @@ def test_lagged_training_data_equal_freq(self, series_type: str): self.helper_check_lagged_data(convert_lags_to_dict=True, **kwargs) @pytest.mark.parametrize( - "series_type", - ["datetime", "integer"], + "params", + product( + ["datetime", "integer"], # series_type + [1, 3], # stride + ), ) - def test_lagged_training_data_unequal_freq(self, series_type): + def test_lagged_training_data_unequal_freq(self, params): """ Tests that `create_lagged_training_data` produces `X`, `y`, and `times` outputs that are consistent with those generated by using the helper @@ -794,6 +812,7 @@ def test_lagged_training_data_unequal_freq(self, series_type): are *not* of the same frequency, the implementation of the 'time intersection' method is being tested here. """ + series_type, stride = params # Define range index timeseries - each has different number of components, # different start times, different lengths, different values, and different # frequencies: @@ -869,6 +888,7 @@ def test_lagged_training_data_unequal_freq(self, series_type): output_chunk_shift, multi_models, max_samples_per_ts, + stride, ) ) @@ -890,6 +910,7 @@ def test_lagged_training_data_unequal_freq(self, series_type): "max_samples_per_ts": max_samples_per_ts, "use_moving_windows": False, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -901,10 +922,13 @@ def test_lagged_training_data_unequal_freq(self, series_type): ) @pytest.mark.parametrize( - "series_type", - ["datetime", "integer"], + "params", + product( + ["datetime", "integer"], # series_type + [1, 3], # stride + ), ) - def test_lagged_training_data_method_consistency(self, series_type): + def test_lagged_training_data_method_consistency(self, params): """ Tests that `create_lagged_training_data` produces the same result when `use_moving_windows = False` and when `use_moving_windows = True` @@ -918,6 +942,7 @@ def test_lagged_training_data_method_consistency(self, series_type): # Define datetime index timeseries - each has different number of components, # different start times, different lengths, different values, and of # different frequencies: + series_type, stride = params if series_type == "integer": target = helper_create_multivariate_linear_timeseries( n_components=2, start_value=0, end_value=10, start=2, length=20, freq=1 @@ -991,6 +1016,7 @@ def test_lagged_training_data_method_consistency(self, series_type): multi_models=multi_models, use_moving_windows=True, output_chunk_shift=output_chunk_shift, + stride=stride, ) # Using time intersection method: X_ti, y_ti, times_ti, _, _ = create_lagged_training_data( @@ -1006,6 +1032,7 @@ def test_lagged_training_data_method_consistency(self, series_type): multi_models=multi_models, use_moving_windows=False, output_chunk_shift=output_chunk_shift, + stride=stride, ) assert np.allclose(X_mw, X_ti) assert np.allclose(y_mw, y_ti) @@ -1021,6 +1048,7 @@ def test_lagged_training_data_method_consistency(self, series_type): [0, 1, 3], [False, True], ["datetime", "integer"], + [1, 3], # stride ), ) def test_lagged_training_data_single_lag_single_component_same_series(self, config): @@ -1032,7 +1060,7 @@ def test_lagged_training_data_single_lag_single_component_same_series(self, conf same time series, and the expected `y` can be formed by taking a single slice from the `target`. """ - output_chunk_shift, use_moving_windows, series_type = config + output_chunk_shift, use_moving_windows, series_type, stride = config if series_type == "integer": series = linear_timeseries(start=0, length=15) else: @@ -1069,6 +1097,12 @@ def test_lagged_training_data_single_lag_single_component_same_series(self, conf ) expected_X = np.expand_dims(expected_X, axis=-1) + if stride > 1: + expected_X = expected_X[::stride] + expected_y = expected_y[::stride] + expected_times_x = expected_times_x[::stride] + expected_times_y = expected_times_y[::stride] + kwargs = { "expected_X": expected_X, "expected_y": expected_y, @@ -1087,6 +1121,7 @@ def test_lagged_training_data_single_lag_single_component_same_series(self, conf "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1196,6 +1231,7 @@ def test_lagged_training_data_extend_past_and_future_covariates(self, config): "max_samples_per_ts": max_samples_per_ts, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": 1, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1211,8 +1247,8 @@ def test_lagged_training_data_extend_past_and_future_covariates(self, config): @pytest.mark.parametrize( "config", - itertools.product( - [0, 1, 3], [False, True], ["datetime", "integer"], [False, True] + product( + [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [1, 3] ), ) def test_lagged_training_data_single_point(self, config): @@ -1220,7 +1256,9 @@ def test_lagged_training_data_single_point(self, config): Tests that `create_lagged_training_data` correctly handles case where only one possible training point can be generated. """ - output_chunk_shift, use_moving_windows, series_type, multi_models = config + output_chunk_shift, use_moving_windows, series_type, multi_models, stride = ( + config + ) # Can only create feature using first value of series (i.e. `0`) # and can only create label using last value of series (i.e. `1`) if series_type == "integer": @@ -1244,6 +1282,11 @@ def test_lagged_training_data_single_point(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X = expected_X[::stride] + expected_y = expected_y[::stride] + expected_times = expected_times[::stride] + # Test correctness for 'moving window' and for 'time intersection' methods, as well # as for different `multi_models` values: kwargs = { @@ -1264,6 +1307,7 @@ def test_lagged_training_data_single_point(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1280,7 +1324,7 @@ def test_lagged_training_data_single_point(self, config): @pytest.mark.parametrize( "config", itertools.product( - [0, 1, 3], [False, True], ["datetime", "integer"], [False, True] + [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [1, 3] ), ) def test_lagged_training_data_zero_lags(self, config): @@ -1295,7 +1339,9 @@ def test_lagged_training_data_zero_lags(self, config): # only possible feature that can be created using these series utilises # the value of `future` at the same time as the label (i.e. a lag # of `0` away from the only feature time): - output_chunk_shift, use_moving_windows, series_type, multi_models = config + output_chunk_shift, use_moving_windows, series_type, multi_models, stride = ( + config + ) if series_type == "integer": target = linear_timeseries( @@ -1329,6 +1375,11 @@ def test_lagged_training_data_zero_lags(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X = expected_X[::stride] + expected_y = expected_y[::stride] + expected_times = expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1349,6 +1400,7 @@ def test_lagged_training_data_zero_lags(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1364,13 +1416,14 @@ def test_lagged_training_data_zero_lags(self, config): @pytest.mark.parametrize( "config", - itertools.product( + product( [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [-1, 0, 1], [-2, 0, 2], + [1, 3], ), ) def test_lagged_training_data_no_target_lags_future_covariates(self, config): @@ -1390,6 +1443,7 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): multi_models, cov_start_shift, cov_lag, + stride, ) = config # adapt covariate start, length, and target length so that only 1 sample can be extracted @@ -1429,6 +1483,11 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X[::stride] + expected_y[::stride] + expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1449,6 +1508,7 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1471,6 +1531,7 @@ def test_lagged_training_data_no_target_lags_future_covariates(self, config): [False, True], [-1, 0], [-2, -1], + [1, 3], ), ) def test_lagged_training_data_no_target_lags_past_covariates(self, config): @@ -1489,6 +1550,7 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): multi_models, cov_start_shift, cov_lag, + stride, ) = config # adapt covariate start, length, and target length so that only 1 sample can be extracted @@ -1528,6 +1590,11 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X[::stride] + expected_y[::stride] + expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1548,6 +1615,7 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1564,7 +1632,7 @@ def test_lagged_training_data_no_target_lags_past_covariates(self, config): @pytest.mark.parametrize( "config", itertools.product( - [0, 1, 3], [False, True], ["datetime", "integer"], [False, True] + [0, 1, 3], [False, True], ["datetime", "integer"], [False, True], [1, 3] ), ) def test_lagged_training_data_positive_lags(self, config): @@ -1580,7 +1648,9 @@ def test_lagged_training_data_positive_lags(self, config): # only possible feature that can be created using these series utilises # the value of `future` one timestep after the time of the label (i.e. a lag # of `1` away from the only feature time): - output_chunk_shift, use_moving_windows, series_type, multi_models = config + output_chunk_shift, use_moving_windows, series_type, multi_models, stride = ( + config + ) if series_type == "integer": target = linear_timeseries( @@ -1613,6 +1683,11 @@ def test_lagged_training_data_positive_lags(self, config): length=1, freq=target.freq, ) + if stride > 1: + expected_X[::stride] + expected_y[::stride] + expected_times[::stride] + # Check correctness for 'moving windows' and 'time intersection' methods, as # well as for different `multi_models` values: kwargs = { @@ -1633,6 +1708,7 @@ def test_lagged_training_data_positive_lags(self, config): "max_samples_per_ts": None, "use_moving_windows": use_moving_windows, "concatenate": True, + "stride": stride, } self.helper_check_lagged_data(convert_lags_to_dict=False, **kwargs) @@ -1653,6 +1729,7 @@ def test_lagged_training_data_positive_lags(self, config): [1, 2], [True, False], ["datetime", "integer"], + [1, 3], ), ) def test_lagged_training_data_comp_wise_lags(self, config): @@ -1662,7 +1739,9 @@ def test_lagged_training_data_comp_wise_lags(self, config): Note that this is supported only when use_moving_window=True. """ - output_chunk_shift, output_chunk_length, multi_models, series_type = config + output_chunk_shift, output_chunk_length, multi_models, series_type, stride = ( + config + ) lags_tg = {"target_0": [-4, -1], "target_1": [-4, -1]} lags_pc = [-3] @@ -1716,6 +1795,7 @@ def test_lagged_training_data_comp_wise_lags(self, config): output_chunk_length, None, output_chunk_shift, + stride=stride, ) # reorder the features to obtain target_0_lag-4, target_1_lag-4, target_0_lag-1, target_1_lag-1 @@ -1762,6 +1842,10 @@ def test_lagged_training_data_comp_wise_lags(self, config): multi_models, output_chunk_shift, )[:, :, np.newaxis] + if stride > 1: + expected_X[::stride] + expected_y[::stride] + feats_times[::stride] # lags are already in dict format self.helper_check_lagged_data( @@ -1783,9 +1867,14 @@ def test_lagged_training_data_comp_wise_lags(self, config): max_samples_per_ts=None, use_moving_windows=True, concatenate=True, + stride=stride, ) - def test_lagged_training_data_sequence_inputs(self): + @pytest.mark.parametrize( + "stride", + [1, 3], + ) + def test_lagged_training_data_sequence_inputs(self, stride): """ Tests that `create_lagged_training_data` correctly handles being passed a sequence of `TimeSeries` inputs, as opposed to individual @@ -1806,12 +1895,14 @@ def test_lagged_training_data_sequence_inputs(self): expected_X_2 = np.concatenate( 3 * [target_2.all_values(copy=False)[:-1, :, :]], axis=1 ) - expected_X = np.concatenate([expected_X_1, expected_X_2], axis=0) - expected_y_1 = target_1.all_values(copy=False)[1:, :, :] - expected_y_2 = target_2.all_values(copy=False)[1:, :, :] + expected_X = np.concatenate( + [expected_X_1[::stride], expected_X_2[::stride]], axis=0 + ) + expected_y_1 = target_1.all_values(copy=False)[1::stride, :, :] + expected_y_2 = target_2.all_values(copy=False)[1::stride, :, :] expected_y = np.concatenate([expected_y_1, expected_y_2], axis=0) - expected_times_1 = target_1.time_index[1:] - expected_times_2 = target_2.time_index[1:] + expected_times_1 = target_1.time_index[1::stride] + expected_times_2 = target_2.time_index[1::stride] kwargs = { "expected_X": expected_X, @@ -1830,6 +1921,7 @@ def test_lagged_training_data_sequence_inputs(self): "multi_models": True, "max_samples_per_ts": None, "use_moving_windows": True, + "stride": stride, } # concatenate=True @@ -1848,7 +1940,11 @@ def test_lagged_training_data_sequence_inputs(self): convert_lags_to_dict=True, concatenate=False, **kwargs ) - def test_lagged_training_data_stochastic_series(self): + @pytest.mark.parametrize( + "stride", + [1, 3], + ) + def test_lagged_training_data_stochastic_series(self, stride): """ Tests that `create_lagged_training_data` is correctly vectorised over the sample axes of the input `TimeSeries`. @@ -1863,10 +1959,10 @@ def test_lagged_training_data_stochastic_series(self): output_chunk_length = 1 # Expected solution: expected_X = np.concatenate( - 3 * [target.all_values(copy=False)[:-1, :, :]], axis=1 + 3 * [target.all_values(copy=False)[:-1:stride, :, :]], axis=1 ) - expected_y = target.all_values(copy=False)[1:, :, :] - expected_times = target.time_index[1:] + expected_y = target.all_values(copy=False)[1::stride, :, :] + expected_times = target.time_index[1::stride] kwargs = { "expected_X": expected_X, @@ -1885,6 +1981,7 @@ def test_lagged_training_data_stochastic_series(self): "multi_models": True, "max_samples_per_ts": None, "use_moving_windows": True, + "stride": stride, } self.helper_check_lagged_data( @@ -1996,6 +2093,44 @@ def test_lagged_training_data_invalid_output_chunk_length_error(self): ) assert "`output_chunk_length` must be a positive `int`." == str(err.value) + def test_lagged_training_data_invalid_stride_error(self): + """ + Tests that `create_lagged_training_data` throws correct error + when `stride` is set to a non-`int` value (e.g. a + `float`) or a non-positive value (e.g. `0`). + """ + target = linear_timeseries(start=1, length=20, freq=1) + lags = [-1] + ocl = 2 + # Check error thrown by 'moving windows' method and by 'time intersection' method: + for use_moving_windows in (False, True): + with pytest.raises(ValueError) as err: + create_lagged_training_data( + target_series=target, + output_chunk_length=ocl, + lags=lags, + uses_static_covariates=False, + use_moving_windows=use_moving_windows, + output_chunk_shift=0, + stride=-1, + ) + assert "`stride` must be a positive integer greater than 0." == str( + err.value + ) + with pytest.raises(ValueError) as err: + create_lagged_training_data( + target_series=target, + output_chunk_length=ocl, + lags=lags, + uses_static_covariates=False, + use_moving_windows=use_moving_windows, + output_chunk_shift=0, + stride=1.1, + ) + assert "`stride` must be a positive integer greater than 0." == str( + err.value + ) + def test_lagged_training_data_no_lags_specified_error(self): """ Tests that `create_lagged_training_data` throws correct error @@ -2729,6 +2864,7 @@ def test_correct_generated_weights_exponential(self, config): ["D", "2D", 2], [True, False], [True, False], + [1, 3], ), ) def test_correct_user_weights(self, config): @@ -2751,14 +2887,18 @@ def test_correct_user_weights(self, config): freq, single_series, univar_series, + stride, ) = config + lags = [-4, -1] if not isinstance(freq, int): freq = pd.tseries.frequencies.to_offset(freq) start = pd.Timestamp("2000-01-01") else: start = 1 - train_y = linear_timeseries(start=start, length=training_size, freq=freq) + train_y = linear_timeseries( + start=start, end_value=training_size - 1, length=training_size, freq=freq + ) if not univar_series: train_y.stack(train_y) @@ -2776,13 +2916,14 @@ def test_correct_user_weights(self, config): ts_weights.stack(ts_weights + 1.0) _, y, _, _, weights = create_lagged_training_data( - lags=[-4, -1], + lags=lags, target_series=train_y if single_series else [train_y] * 2, output_chunk_length=ocl, uses_static_covariates=False, sample_weight=ts_weights if single_series else [ts_weights] * 2, output_chunk_shift=ocs, use_moving_windows=use_moving_windows, + stride=stride, ) # weights shape must match label shape, since we have one @@ -2796,11 +2937,15 @@ def test_correct_user_weights(self, config): # the weights correspond to the same sample and time index as the `y` labels expected_weights = [] - len_y_single = len(y) if single_series else int(len(y) / 2) + len_y_single = len(y) if single_series else len(y) // 2 for i in range(ocl): - mask = slice(-(i + len_y_single), -i if i else None) + # shifted by the steps required to create the first set of features + first_label_idx = -min(lags) + ocs + i + # make enough room for all the strided labels + last_label_idx = first_label_idx + len_y_single * stride + mask = slice(first_label_idx, last_label_idx, stride) expected_weights.append(weights_exact[mask]) - expected_weights = np.concatenate(expected_weights, axis=1)[:, ::-1] + expected_weights = np.concatenate(expected_weights, axis=1) if not single_series: expected_weights = np.concatenate([expected_weights] * 2, axis=0) np.testing.assert_array_almost_equal(weights[:, :, 0], expected_weights) diff --git a/darts/utils/data/horizon_based_dataset.py b/darts/utils/data/horizon_based_dataset.py index 962e910df0..5325da23f1 100644 --- a/darts/utils/data/horizon_based_dataset.py +++ b/darts/utils/data/horizon_based_dataset.py @@ -4,6 +4,7 @@ """ from collections.abc import Sequence +from math import ceil from typing import Optional, Union import numpy as np @@ -25,6 +26,7 @@ def __init__( output_chunk_length: int = 12, lh: tuple[int, int] = (1, 3), lookback: int = 3, + stride: int = 1, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, ) -> None: @@ -39,7 +41,7 @@ def __init__( Given the horizon `output_chunk_length` of a model, this dataset will compute some "past/future" splits as follows: - First a "forecast point" is selected in the the range of the last + First a "forecast point" is selected in the range of the last `(min_lh * output_chunk_length, max_lh * output_chunk_length)` points before the end of the time series. The "future" then consists in the following `output_chunk_length` points, and the "past" will be the preceding `lookback * output_chunk_length` points. @@ -72,6 +74,10 @@ def __init__( A integer interval for the length of the input in the emitted input and output splits, expressed as a multiple of `output_chunk_length`. For instance, `lookback=3` will emit "inputs" of lengths `3 * output_chunk_length`. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. use_static_covariates Whether to use/include static covariate data from input series. sample_weight @@ -104,8 +110,15 @@ def __init__( self.output_chunk_length = output_chunk_length self.min_lh, self.max_lh = lh self.lookback = lookback + self.stride = stride # Checks + if not (isinstance(stride, int) and stride > 0): + raise_log( + ValueError("`stride` must be a positive integer greater than 0."), + logger=logger, + ) + if not (self.max_lh >= self.min_lh >= 1): raise_log( ValueError( @@ -114,7 +127,9 @@ def __init__( ), logger=logger, ) - self.nr_samples_per_ts = (self.max_lh - self.min_lh) * self.output_chunk_length + self.nr_samples_per_ts = ceil( + ((self.max_lh - self.min_lh) * self.output_chunk_length) / self.stride + ) self.total_nr_samples = len(self.target_series) * self.nr_samples_per_ts self.use_static_covariates = use_static_covariates @@ -149,7 +164,7 @@ def __getitem__( # determine the index lh_idx of the forecasting point (the last point of the input series, before the target) # lh_idx should be in [0, self.nr_samples_per_ts) - lh_idx = idx - (target_idx * self.nr_samples_per_ts) + lh_idx = idx * self.stride - (target_idx * self.nr_samples_per_ts) # determine the index at the end of the output chunk end_of_output_idx = len(target_series) - ( diff --git a/darts/utils/data/sequential_dataset.py b/darts/utils/data/sequential_dataset.py index 0ebd68a1cb..216cf0994c 100644 --- a/darts/utils/data/sequential_dataset.py +++ b/darts/utils/data/sequential_dataset.py @@ -28,6 +28,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -64,6 +65,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -94,6 +97,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, use_static_covariates=use_static_covariates, @@ -123,6 +127,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -159,6 +164,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -189,6 +196,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=True, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, @@ -218,6 +226,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -255,6 +264,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -286,6 +297,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.HISTORIC_FUTURE, use_static_covariates=use_static_covariates, @@ -300,6 +312,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=True, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, @@ -341,6 +354,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -381,6 +395,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -412,6 +428,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, use_static_covariates=use_static_covariates, @@ -425,6 +442,7 @@ def __init__( input_chunk_length=input_chunk_length, output_chunk_length=output_chunk_length, output_chunk_shift=output_chunk_shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=use_static_covariates, ) @@ -467,6 +485,7 @@ def __init__( input_chunk_length: int = 12, output_chunk_length: int = 1, output_chunk_shift: int = 0, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -506,6 +525,8 @@ def __init__( The length of the emitted future series. output_chunk_shift Optionally, the number of steps to shift the start of the output chunk into the future. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -536,6 +557,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=False, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, use_static_covariates=use_static_covariates, @@ -550,6 +572,7 @@ def __init__( output_chunk_length=output_chunk_length, shift=shift, shift_covariates=True, + stride=stride, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, diff --git a/darts/utils/data/shifted_dataset.py b/darts/utils/data/shifted_dataset.py index d7e537dc4b..9ac233834e 100644 --- a/darts/utils/data/shifted_dataset.py +++ b/darts/utils/data/shifted_dataset.py @@ -4,6 +4,7 @@ """ from collections.abc import Sequence +from math import ceil from typing import Optional, Union import numpy as np @@ -31,6 +32,7 @@ def __init__( covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -65,6 +67,8 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive entries. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -93,6 +97,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, @@ -122,6 +127,7 @@ def __init__( covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -158,6 +164,10 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -187,6 +197,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=True, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, @@ -216,6 +227,7 @@ def __init__( covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -254,6 +266,10 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -284,6 +300,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.HISTORIC_FUTURE, @@ -298,6 +315,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=True, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, @@ -339,6 +357,7 @@ def __init__( future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -377,6 +396,10 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -406,6 +429,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, @@ -419,6 +443,7 @@ def __init__( covariates=future_covariates, length=length, shift=shift, + stride=stride, max_samples_per_ts=max_samples_per_ts, use_static_covariates=use_static_covariates, ) @@ -460,6 +485,7 @@ def __init__( future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, length: int = 12, shift: int = 1, + stride: int = 1, max_samples_per_ts: Optional[int] = None, use_static_covariates: bool = True, sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, @@ -498,6 +524,10 @@ def __init__( The length of the emitted past and future series. shift The number of time steps by which to shift the output chunks relative to the start of the input chunks. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts This is an upper bound on the number of tuples that can be produced per time series. It can be used in order to have an upper bound on the total size of the dataset and @@ -528,6 +558,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=False, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.PAST, @@ -542,6 +573,7 @@ def __init__( input_chunk_length=length, output_chunk_length=length, shift=shift, + stride=stride, shift_covariates=True, max_samples_per_ts=max_samples_per_ts, covariate_type=CovariateType.FUTURE, @@ -584,6 +616,7 @@ def __init__( output_chunk_length: int = 1, shift: int = 1, shift_covariates: bool = False, + stride: int = 1, max_samples_per_ts: Optional[int] = None, covariate_type: CovariateType = CovariateType.NONE, use_static_covariates: bool = True, @@ -613,9 +646,13 @@ def __init__( shift_covariates Whether to shift the covariates forward the same way as the target. FutureCovariatesModel's require this set to True, while PastCovariatesModel's require this set to False. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. max_samples_per_ts - This is an upper bound on the number of (input, output, input_covariates) tuples that can be produced - per time series. It can be used in order to have an upper bound on the total size of the dataset and + This is an upper bound on the number of tuples that can be produced per time series. + It can be used in order to have an upper bound on the total size of the dataset and ensure proper sampling. If `None`, it will read all of the individual time series in advance (at dataset creation) to know their sizes, which might be expensive on big datasets. If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the @@ -637,11 +674,18 @@ def __init__( """ super().__init__() + if not (isinstance(stride, int) and stride > 0): + raise_log( + ValueError("`stride` must be a positive integer greater than 0."), + logger=logger, + ) + # setup target and sequence self.target_series = series2seq(target_series) self.input_chunk_length = input_chunk_length self.output_chunk_length = output_chunk_length self.shift = shift + self.stride = stride self.max_samples_per_ts = max_samples_per_ts self.size_of_both_chunks = max( self.input_chunk_length, self.shift + self.output_chunk_length @@ -692,9 +736,12 @@ def __init__( # setup samples if self.max_samples_per_ts is None: # read all time series to get the maximum size - self.max_samples_per_ts = ( + max_samples_per_ts = ( max(len(ts) for ts in self.target_series) - self.size_of_both_chunks + 1 ) + + # adjust by `stride` + self.max_samples_per_ts = ceil(max_samples_per_ts / self.stride) self.ideal_nr_samples = len(self.target_series) * self.max_samples_per_ts def __len__(self): @@ -715,7 +762,9 @@ def __getitem__( target_vals = target_series.random_component_values(copy=False) # determine the actual number of possible samples in this time series - n_samples_in_ts = len(target_vals) - self.size_of_both_chunks + 1 + n_samples_in_ts = ceil( + (len(target_vals) - self.size_of_both_chunks + 1) / self.stride + ) if n_samples_in_ts < 1: raise_log( @@ -731,7 +780,9 @@ def __getitem__( # it is originally in [0, self.max_samples_per_ts), so we use a modulo to have it in [0, n_samples_in_ts) end_of_output_idx = ( len(target_series) - - (idx - (target_idx * self.max_samples_per_ts)) % n_samples_in_ts + - (idx - (target_idx * self.max_samples_per_ts)) + % n_samples_in_ts + * self.stride ) # optionally, load covariates diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index 1742f7ccd1..4f4a208a4d 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -38,6 +38,7 @@ def create_lagged_data( is_training: bool = True, concatenate: bool = True, sample_weight: Optional[Union[str, TimeSeries, Sequence[TimeSeries]]] = None, + stride: int = 1, show_warnings: bool = True, ) -> tuple[ ArrayOrArraySequence, @@ -220,6 +221,10 @@ def create_lagged_data( `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are computed globally based on the length of the longest series in `series`. Then for each series, the weights are extracted from the end of the global weights. This gives a common time weighting across all series. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. show_warnings Whether to show warnings. @@ -357,6 +362,7 @@ def create_lagged_data( multi_models=multi_models, check_inputs=check_inputs, is_training=is_training, + stride=stride, show_warnings=show_warnings, ) else: @@ -374,6 +380,7 @@ def create_lagged_data( multi_models=multi_models, check_inputs=check_inputs, is_training=is_training, + stride=stride, show_warnings=show_warnings, ) X_i, last_static_covariates_shape = add_static_covariates_to_lagged_data( @@ -418,6 +425,7 @@ def create_lagged_training_data( check_inputs: bool = True, use_moving_windows: bool = True, concatenate: bool = True, + stride: int = 1, sample_weight: Optional[Union[TimeSeries, str]] = None, ) -> tuple[ ArrayOrArraySequence, @@ -497,6 +505,10 @@ def create_lagged_training_data( when `Sequence[TimeSeries]` are provided, then `X` and `y` will be arrays created by concatenating all feature/label arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it might + introduce bias in the forecasts. sample_weight Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, per label (each step in `output_chunk_length`), and per component. @@ -559,6 +571,7 @@ def create_lagged_training_data( use_moving_windows=use_moving_windows, is_training=True, concatenate=concatenate, + stride=stride, sample_weight=sample_weight, ) @@ -576,6 +589,7 @@ def create_lagged_prediction_data( check_inputs: bool = True, use_moving_windows: bool = True, concatenate: bool = True, + stride: int = 1, show_warnings: bool = True, ) -> tuple[ArrayOrArraySequence, Sequence[pd.Index]]: """ @@ -641,6 +655,10 @@ def create_lagged_prediction_data( `Sequence[TimeSeries]` are provided, then `X` will be an array created by concatenating all feature arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. + stride + The number of time steps between consecutive samples (windows of lagged values extracted from the target + series), applied starting from the end of the series. This should be used with caution as it will cause + gaps in the forecasts. show_warnings Whether to show warnings. @@ -683,6 +701,7 @@ def create_lagged_prediction_data( use_moving_windows=use_moving_windows, is_training=False, concatenate=concatenate, + stride=stride, show_warnings=show_warnings, ) return X, times @@ -971,6 +990,7 @@ def _create_lagged_data_by_moving_window( multi_models: bool, check_inputs: bool, is_training: bool, + stride: int, show_warnings: bool = True, ) -> tuple[np.ndarray, Optional[np.ndarray], pd.Index, Optional[np.ndarray]]: """ @@ -1008,6 +1028,11 @@ def _create_lagged_data_by_moving_window( raise_log( ValueError("Must specify at least one series-lags pair."), logger=logger ) + if not (isinstance(stride, int) and stride > 0): + raise_log( + ValueError("`stride` must be a positive integer greater than 0."), + logger=logger, + ) sample_weight_vals = _extract_sample_weight(sample_weight, target_series) time_bounds = get_shared_times_bounds(*feature_times) @@ -1026,6 +1051,11 @@ def _create_lagged_data_by_moving_window( ) else: times = pd.date_range(start=time_bounds[0], end=time_bounds[1], freq=freq) + + if stride > 1: + # calculate the starting index so that the last element is included after applying the stride + # equivalent to times[::-stride][::-1] + times = times[(len(times) - 1) % stride :: stride] num_samples = len(times) if num_samples > max_samples_per_ts: times = times[-max_samples_per_ts:] @@ -1084,14 +1114,17 @@ def _create_lagged_data_by_moving_window( first_window_start_idx = start_time_idx - max_lag_i first_window_end_idx = first_window_start_idx + window_len # Other windows are formed by sequentially shifting first window forward - # by 1 index position each time; to create `(num_samples - 1)` more windows - # in addition to the first window, need to take `(num_samples - 1)` values - # after `first_window_end_idx`: + # by `stride` position each time; to create `(num_samples - 1)` more windows + # in addition to the first window, need to take `(num_samples - 1) * stride` + # values after `first_window_end_idx`: vals = series_i.all_values(copy=False)[ - first_window_start_idx : first_window_end_idx + num_samples - 1, :, : + first_window_start_idx : first_window_end_idx + + (num_samples - 1) * stride, + :, + :, ] windows = strided_moving_window( - x=vals, window_len=window_len, stride=1, axis=0, check_inputs=False + x=vals, window_len=window_len, stride=stride, axis=0, check_inputs=False ) # Within each window, the `-1` indexed value (i.e. the value at the very end of @@ -1126,16 +1159,18 @@ def _create_lagged_data_by_moving_window( continue # To create `(num_samples - 1)` other windows in addition to first window, - # must take `(num_samples - 1)` values ahead of `first_window_end_idx` + # must take `(num_samples - 1) * stride` values ahead of `first_window_end_idx` + # to also take the stride into consideration vals = vals[ - first_window_start_idx : first_window_end_idx + num_samples - 1, + first_window_start_idx : first_window_end_idx + + (num_samples - 1) * stride, :, :, ] windows = strided_moving_window( x=vals, window_len=output_chunk_length, - stride=1, + stride=stride, axis=0, check_inputs=False, ) @@ -1207,6 +1242,7 @@ def _create_lagged_data_by_intersecting_times( multi_models: bool, check_inputs: bool, is_training: bool, + stride: int, show_warnings: bool = True, ) -> tuple[ np.ndarray, @@ -1243,6 +1279,11 @@ def _create_lagged_data_by_intersecting_times( raise_log( ValueError("Must specify at least one series-lags pair."), logger=logger ) + if not (isinstance(stride, int) and stride > 0): + raise_log( + ValueError("`stride` must be a positive integer greater than 0."), + logger=logger, + ) sample_weight_vals = _extract_sample_weight(sample_weight, target_series) shared_times = get_shared_times(*feature_times, sort=True) if shared_times is None: @@ -1252,6 +1293,10 @@ def _create_lagged_data_by_intersecting_times( ), logger=logger, ) + if stride > 1: + # calculate the starting index so that the last element is included after applying the stride + # equivalent to shared_times[::-stride][::-1] + shared_times = shared_times[(len(shared_times) - 1) % stride :: stride] if len(shared_times) > max_samples_per_ts: shared_times = shared_times[-max_samples_per_ts:] X = []