Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Is it possible to modify the training dataset to use a future covariate that has a history of forecasts? #2421

Closed
gconnell-hiringa opened this issue Jun 21, 2024 · 4 comments
Labels
question Further information is requested

Comments

@gconnell-hiringa
Copy link

gconnell-hiringa commented Jun 21, 2024

I have a dataset where a future covariate is a weather forecast for the next 10 time periods and it is updated at every new time step.

Is it possible to use or modify the training dataset methods to generate training samples with the weather forecast for the relevant window?

I suspect this is getting into subclassing the TrainingDataset and would be great if there are any examples that can be shared.

Appreciate any help that can be given.

@gconnell-hiringa gconnell-hiringa added the triage Issue waiting for triaging label Jun 21, 2024
@madtoinou madtoinou added question Further information is requested and removed triage Issue waiting for triaging labels Jun 21, 2024
@dennisbader
Copy link
Collaborator

dennisbader commented Jun 24, 2024

Hi @gconnell-hiringa, it is possible but requires quite some manual work.

  • You'll have to subclass from GenericShiftedDataset to adapt the general logic for extracting your future covariates.
  • Then subclass from *CovariatesSequentialDataset for the covariates support. The * depends on the model you pick. E.g. for TiDEModel it is the MixedCovariatesSequentialDataset. You can find a list of models with the corresponding Datasets here.
  • Then use model.fit_from_dataset() to train on this custom dataset.

I'll give you an example below how you could achieve it for TiDEModel. To keep the changes "simple", I'll make the requirement that all future covariates must have the same time index as the target series (otherwise it'll fail with the logic from below).

Subclass from GenericShiftedDataset to add the logic. Only the part in between # =========> START NEW CODE <========= and
# =========> END NEW CODE <========= is actually new.

class CustomGSD(GenericShiftedDataset):
    def __getitem__(
            self, idx
    ) -> Tuple[
        np.ndarray,
        Optional[np.ndarray],
        Optional[np.ndarray],
        Optional[np.ndarray],
        np.ndarray,
    ]:
        # determine the index of the time series.
        target_idx = idx // self.max_samples_per_ts
        target_series = self.target_series[target_idx]
        target_vals = target_series.random_component_values(copy=False)

        # determine the actual number of possible samples in this time series
        n_samples_in_ts = len(target_vals) - self.size_of_both_chunks + 1

        if n_samples_in_ts < 1:
            raise_log(
                ValueError(
                    "The dataset contains some time series that are too short to contain "
                    "`max(self.input_chunk_length, self.shift + self.output_chunk_length)` "
                    f"({target_idx}-th series)"
                ),
                logger=logger,
            )

        # determine the index at the end of the output chunk
        # it is originally in [0, self.max_samples_per_ts), so we use a modulo to have it in [0, n_samples_in_ts)
        end_of_output_idx = (
                len(target_series)
                - (idx - (target_idx * self.max_samples_per_ts)) % n_samples_in_ts
        )

        # optionally, load covariates
        covariate_series = (
            self.covariates[target_idx] if self.covariates is not None else None
        )
        # =========> START NEW CODE <=========
        if covariate_series is not None and self.covariate_type != CovariateType.PAST:
            fc_idx = idx % self.max_samples_per_ts
            covariate_series = covariate_series[fc_idx]
            if not covariate_series.has_same_time_as(target_series):
                raise_log(
                    ValueError(
                        "For this new Dataset to work, all future covariates must have "
                        "identical time index as the corresponding target series."
                    )
                )
        # =========> END NEW CODE <=========

        # optionally, load sample weight
        if self.sample_weight is not None:
            sample_weight_series = self.sample_weight[target_idx]
            weight_n_comp = sample_weight_series.n_components
            if weight_n_comp > 1 and weight_n_comp != target_series.n_components:
                raise_log(
                    ValueError(
                        "The number of components in `sample_weight` must either be `1` or match "
                        f"the number of target series components `{target_series.n_components}`. "
                        f"({target_idx}-th series)"
                    ),
                    logger=logger,
                )
        else:
            sample_weight_series = None

        # get all indices for the current sample
        (
            past_start,
            past_end,
            future_start,
            future_end,
            covariate_start,
            covariate_end,
            sample_weight_start,
            sample_weight_end,
        ) = self._memory_indexer(
            target_idx=target_idx,
            target_series=target_series,
            shift=self.shift,
            input_chunk_length=self.input_chunk_length,
            output_chunk_length=self.output_chunk_length,
            end_of_output_idx=end_of_output_idx,
            covariate_series=covariate_series,
            covariate_type=self.main_covariate_type,
            sample_weight_series=sample_weight_series,
        )

        # extract sample target
        future_target = target_vals[future_start:future_end]
        past_target = target_vals[past_start:past_end]

        # extract sample covariates
        covariate = None
        if self.covariates is not None:
            if covariate_end > len(covariate_series):
                raise_log(
                    ValueError(
                        f"The dataset contains {self.main_covariate_type.value} covariates "
                        f"that don't extend far enough into the future. ({idx}-th sample)"
                    ),
                    logger=logger,
                )

            covariate = covariate_series.random_component_values(copy=False)[
                        covariate_start:covariate_end
                        ]

            if len(covariate) != (
                    self.output_chunk_length
                    if self.shift_covariates
                    else self.input_chunk_length
            ):
                raise_log(
                    ValueError(
                        f"The dataset contains {self.main_covariate_type.value} covariates "
                        f"whose time axis doesn't allow to obtain the input (or output) chunk relative to the "
                        f"target series."
                    ),
                    logger=logger,
                )

        # extract sample weights
        sample_weight = None
        if self.sample_weight is not None:
            if sample_weight_end > len(sample_weight_series):
                raise_log(
                    ValueError(
                        f"The dataset contains sample weights "
                        f"that don't extend far enough into the future. ({idx}-th sample)"
                    ),
                    logger=logger,
                )

            sample_weight = sample_weight_series.random_component_values(copy=False)[
                            sample_weight_start:sample_weight_end
                            ]

            if len(sample_weight) != self.output_chunk_length:
                raise_log(
                    ValueError(
                        "The dataset contains sample weights whose time axis doesn't allow to obtain "
                        "the input (or output) chunk relative to the target series."
                    ),
                    logger=logger,
                )

        # extract sample static covariates
        if self.use_static_covariates:
            static_covariate = target_series.static_covariates_values(copy=False)
        else:
            static_covariate = None
        return past_target, covariate, static_covariate, sample_weight, future_target

Then subclass from MixedCovariatesSequentialDataset to use the CustomGSD:

class CustomMCSD(MixedCovariatesSequentialDataset):
    def __init__(
        self,
        target_series: Union[TimeSeries, Sequence[TimeSeries]],
        past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
        future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
        input_chunk_length: int = 12,
        output_chunk_length: int = 1,
        output_chunk_shift: int = 0,
        max_samples_per_ts: Optional[int] = None,
        use_static_covariates: bool = True,
        sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None,
    ):
        shift = input_chunk_length + output_chunk_shift
        # This dataset is in charge of serving past covariates
        self.ds_past = CustomGSD(
            target_series=target_series,
            covariates=past_covariates,
            input_chunk_length=input_chunk_length,
            output_chunk_length=output_chunk_length,
            shift=shift,
            shift_covariates=False,
            max_samples_per_ts=max_samples_per_ts,
            covariate_type=CovariateType.PAST,
            use_static_covariates=use_static_covariates,
            sample_weight=sample_weight,
        )

        # This dataset is in charge of historical future covariates
        self.ds_historic_future = CustomGSD(
            target_series=target_series,
            covariates=future_covariates,
            input_chunk_length=input_chunk_length,
            output_chunk_length=output_chunk_length,
            shift=shift,
            shift_covariates=False,
            max_samples_per_ts=max_samples_per_ts,
            covariate_type=CovariateType.HISTORIC_FUTURE,
            use_static_covariates=use_static_covariates,
            sample_weight=sample_weight,
        )

        # This dataset is in charge of serving future covariates
        self.ds_future = CustomGSD(
            target_series=target_series,
            covariates=future_covariates,
            input_chunk_length=input_chunk_length,
            output_chunk_length=output_chunk_length,
            shift=shift,
            shift_covariates=True,
            max_samples_per_ts=max_samples_per_ts,
            covariate_type=CovariateType.FUTURE,
            use_static_covariates=use_static_covariates,
        )

    def __getitem__(
        self, idx
    ) -> Tuple[
        np.ndarray,
        Optional[np.ndarray],
        Optional[np.ndarray],
        Optional[np.ndarray],
        Optional[np.ndarray],
        Optional[np.ndarray],
        np.ndarray,
    ]:
        # get past target and past covariates
        past_target, past_covariate, static_covariate, sample_weight, future_target = (
            self.ds_past[idx]
        )
        # get historic values of future covariates
        _, historic_future_covariate, _, _, _ = (
            self.ds_historic_future[idx]
        )
        # get future values of future covariates
        _, future_covariate, _, _, _ = self.ds_future[idx]
        return (
            past_target,
            past_covariate,
            historic_future_covariate,
            future_covariate,
            static_covariate,
            sample_weight,
            future_target,
        )

Now create your target series and future covariates. We'll give the DataSet a list of target series, and list of lists of future covariates (fc). Each inner list corresponds to all the future covariates for one target series.

The example will generate two samples -> I'll pass an inner list of two fc series as well. Every sample will extract the future covariates from the corresponding fc from the inner list.

# input and output chunk lengths
icl, ocl = 5, 2

# the length of the target series will result in 2 samples
series = tg.linear_timeseries(length=icl + ocl + 1).astype(np.float32)

# to keep it 'simple', we make a requirement that all future covariate series must have the same time index as
# the target series
# since we have two samples, we create 2 future covariates (fc). We add `1` to the first and `2` to the second, to
# distinguish them later on
fc1 = series + 1
fc2 = series + 2

ds = CustomMCSD(
    target_series=[series],
    past_covariates=None,
    future_covariates=[[fc1, fc2]],
    input_chunk_length=icl,
    output_chunk_length=ocl,
    output_chunk_shift=0,
    max_samples_per_ts=None,
    use_static_covariates=False,
    sample_weight=None,
)

# two samples
assert len(ds) == 2

# first sample gets first future covariates which have values of the target series `+ 1`
# historic fc == past target + 1
assert (ds[0][2] == ds[0][0] + 1).all()
# future fc == future target (labels) + 1
assert (ds[0][3] == ds[0][-1] + 1).all()

# second sample gets second future covariates which have values of the target series `+ 2`
# historic fc == past target + 1
assert (ds[1][2] == ds[1][0] + 2).all()
# future fc == future target (labels) + 1
assert (ds[1][3] == ds[1][-1] + 2).all()

For the tests, you see that the extraction worked.

Now as a last check, we try to train a TiDEModel on this custom dataset:

# and we can also train any MixedCovariatesModel now
# https://unit8co.github.io/darts/userguide/torch_forecasting_models.html#torch-forecasting-model-covariates-support
from darts.models import TiDEModel

model = TiDEModel(icl, ocl)
model.fit_from_dataset(ds)

@gconnell-hiringa
Copy link
Author

@dennisbader THANK YOU!!!! This response far exceeds what I was hoping for, so thank you for going to the extra effort of providing an example.

I've tried out the example and can follow what is going on. I'm 99% sure this is going to work well for the model I am working on at the moment, which is a TFT Model so the mixed covariate example is perfect.

I'll post back to this thread with further feedback when I've figured out the implementation.

@ETTAN93
Copy link

ETTAN93 commented Jun 28, 2024

@dennisbader, would this be also relevant to what we discussed the other day on how to do a historical backtest with changing historical forecasts? we spoke about having to use fit and predict rather than the 'historical forecast' method out of the box? thanks

@dennisbader
Copy link
Collaborator

Hi @ETTAN93, this might potentially also work with pre-trained TorchForecastingModels. But it would require adapting the corresponding *InferenceDataset as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants