Source code for pytorch_forecasting.models.nhits._nhits

"""
N-HiTS model for timeseries forecasting with covariates.
"""

from copy import copy
from typing import Optional, Union

import numpy as np
import torch
from torch import nn

from pytorch_forecasting.data import TimeSeriesDataSet
from pytorch_forecasting.data.encoders import NaNLabelEncoder
from pytorch_forecasting.metrics import (
    MAE,
    MAPE,
    MASE,
    RMSE,
    SMAPE,
    MultiHorizonMetric,
    MultiLoss,
)
from pytorch_forecasting.models.base import BaseModelWithCovariates
from pytorch_forecasting.models.nhits.sub_modules import NHiTS as NHiTSModule
from pytorch_forecasting.models.nn.embeddings import MultiEmbedding
from pytorch_forecasting.utils import create_mask, to_list
from pytorch_forecasting.utils._dependencies import _check_matplotlib


[docs] class NHiTS(BaseModelWithCovariates): def __init__( self, output_size: Union[int, list[int]] = 1, static_categoricals: Optional[list[str]] = None, static_reals: Optional[list[str]] = None, time_varying_categoricals_encoder: Optional[list[str]] = None, time_varying_categoricals_decoder: Optional[list[str]] = None, categorical_groups: Optional[dict[str, list[str]]] = None, time_varying_reals_encoder: Optional[list[str]] = None, time_varying_reals_decoder: Optional[list[str]] = None, embedding_sizes: Optional[dict[str, tuple[int, int]]] = None, embedding_paddings: Optional[list[str]] = None, embedding_labels: Optional[list[str]] = None, x_reals: Optional[list[str]] = None, x_categoricals: Optional[list[str]] = None, context_length: int = 1, prediction_length: int = 1, static_hidden_size: Optional[int] = None, naive_level: bool = True, shared_weights: bool = True, activation: str = "ReLU", initialization: str = "lecun_normal", n_blocks: Optional[list[str]] = None, n_layers: Union[int, list[int]] = 2, hidden_size: int = 512, pooling_sizes: Optional[list[int]] = None, downsample_frequencies: Optional[list[int]] = None, pooling_mode: str = "max", interpolation_mode: str = "linear", batch_normalization: bool = False, dropout: float = 0.0, learning_rate: float = 1e-2, log_interval: int = -1, log_gradient_flow: bool = False, log_val_interval: int = None, weight_decay: float = 1e-3, loss: MultiHorizonMetric = None, reduce_on_plateau_patience: int = 1000, backcast_loss_ratio: float = 0.0, logging_metrics: nn.ModuleList = None, **kwargs, ): """ Initialize N-HiTS Model - use its :py:meth:`~from_dataset` method if possible. Based on the article `N-HiTS: Neural Hierarchical Interpolation for Time Series Forecasting <http://arxiv.org/abs/2201.12886>`_. The network has shown to increase accuracy by ~25% against :py:class:`~pytorch_forecasting.models.nbeats.NBeats` and also supports covariates. Args: hidden_size (int): size of hidden layers and can range from 8 to 1024 - use 32-128 if no covariates are employed. Defaults to 512. static_hidden_size (Optional[int], optional): size of hidden layers for static variables. Defaults to hidden_size. loss: loss to optimize. Defaults to MASE(). QuantileLoss is also supported shared_weights (bool, optional): if True, weights of blocks are shared in each stack. Defaults to True. naive_level (bool, optional): if True, native forecast of last observation is added at the beginnging. Defaults to True. initialization (str, optional): Initialization method. One of ['orthogonal', 'he_uniform', 'glorot_uniform', 'glorot_normal', 'lecun_normal']. Defaults to "lecun_normal". n_blocks (List[int], optional): list of blocks used in each stack (i.e. length of stacks). Defaults to [1, 1, 1]. n_layers (Union[int, List[int]], optional): Number of layers per block or list of number of layers used by blocks in each stack (i.e. length of stacks). Defaults to 2. pooling_sizes (Optional[List[int]], optional): List of pooling sizes for input for each stack, i.e. higher means more smoothing of input. Using an ordering of higher to lower in the list improves results. Defaults to a heuristic. pooling_mode (str, optional): Pooling mode for summarizing input. One of ['max','average']. Defaults to "max". downsample_frequencies (Optional[List[int]], optional): Downsample multiplier of output for each stack, i.e. higher means more interpolation at forecast time is required. Should be equal or higher than pooling_sizes but smaller equal prediction_length. Defaults to a heuristic to match pooling_sizes. interpolation_mode (str, optional): Interpolation mode for forecasting. One of ['linear', 'nearest', 'cubic-x'] where 'x' is replaced by a batch size for the interpolation. Defaults to "linear". batch_normalization (bool, optional): Whether carry out batch normalization. Defaults to False. dropout (float, optional): dropout rate for hidden layers. Defaults to 0.0. activation (str, optional): activation function. One of ['ReLU', 'Softplus', 'Tanh', 'SELU', 'LeakyReLU', 'PReLU', 'Sigmoid']. Defaults to "ReLU". output_size: number of outputs (typically number of quantiles for QuantileLoss and one target or list of output sizes but currently only point-forecasts allowed). Set automatically. static_categoricals: names of static categorical variables static_reals: names of static continuous variables time_varying_categoricals_encoder: names of categorical variables for encoder time_varying_categoricals_decoder: names of categorical variables for decoder time_varying_reals_encoder: names of continuous variables for encoder time_varying_reals_decoder: names of continuous variables for decoder categorical_groups: dictionary where values are list of categorical variables that are forming together a new categorical variable which is the key in the dictionary x_reals: order of continuous variables in tensor passed to forward function x_categoricals: order of categorical variables in tensor passed to forward function hidden_continuous_size: default for hidden size for processing continous variables (similar to categorical embedding size) hidden_continuous_sizes: dictionary mapping continuous input indices to sizes for variable selection (fallback to hidden_continuous_size if index is not in dictionary) embedding_sizes: dictionary mapping (string) indices to tuple of number of categorical classes and embedding size embedding_paddings: list of indices for embeddings which transform the zero's embedding to a zero vector embedding_labels: dictionary mapping (string) indices to list of categorical labels learning_rate: learning rate log_interval: log predictions every x batches, do not log if 0 or less, log interpretation if > 0. If < 1.0 , will log multiple entries per batch. Defaults to -1. log_val_interval: frequency with which to log validation set metrics, defaults to log_interval log_gradient_flow: if to log gradient flow, this takes time and should be only done to diagnose training failures prediction_length: Length of the prediction. Also known as 'horizon'. context_length: Number of time units that condition the predictions. Also known as 'lookback period'. Should be between 1-10 times the prediction length. backcast_loss_ratio: weight of backcast in comparison to forecast when calculating the loss. A weight of 1.0 means that forecast and backcast loss is weighted the same (regardless of backcast and forecast lengths). Defaults to 0.0, i.e. no weight. log_gradient_flow: if to log gradient flow, this takes time and should be only done to diagnose training failures reduce_on_plateau_patience (int): patience after which learning rate is reduced by a factor of 10 logging_metrics (nn.ModuleList[MultiHorizonMetric]): list of metrics that are logged during training. Defaults to nn.ModuleList([SMAPE(), MAE(), RMSE(), MAPE(), MASE()]) **kwargs: additional arguments to :py:class:`~BaseModel`. """ # noqa: E501 if static_categoricals is None: static_categoricals = [] if static_reals is None: static_reals = [] if time_varying_categoricals_encoder is None: time_varying_categoricals_encoder = [] if time_varying_categoricals_decoder is None: time_varying_categoricals_decoder = [] if categorical_groups is None: categorical_groups = {} if time_varying_reals_encoder is None: time_varying_reals_encoder = [] if time_varying_reals_decoder is None: time_varying_reals_decoder = [] if embedding_sizes is None: embedding_sizes = {} if embedding_paddings is None: embedding_paddings = [] if embedding_labels is None: embedding_labels = {} if x_reals is None: x_reals = [] if x_categoricals is None: x_categoricals = [] if n_blocks is None: n_blocks = [1, 1, 1] if logging_metrics is None: logging_metrics = nn.ModuleList([SMAPE(), MAE(), RMSE(), MAPE(), MASE()]) if loss is None: loss = MASE() if activation == "SELU": self.hparams.initialization = "lecun_normal" # provide default downsampling sizes n_stacks = len(n_blocks) if pooling_sizes is None: pooling_sizes = np.exp2( np.round(np.linspace(0.49, np.log2(prediction_length / 2), n_stacks)) ) pooling_sizes = [int(x) for x in pooling_sizes[::-1]] # remove zero from pooling_sizes pooling_sizes = max(pooling_sizes, [1] * len(pooling_sizes)) if downsample_frequencies is None: downsample_frequencies = [ min(prediction_length, int(np.power(x, 1.5))) for x in pooling_sizes ] # remove zero from downsample_frequencies downsample_frequencies = max( downsample_frequencies, [1] * len(downsample_frequencies) ) # set static hidden size if static_hidden_size is None: static_hidden_size = hidden_size # set layers if isinstance(n_layers, int): n_layers = [n_layers] * n_stacks self.save_hyperparameters() super().__init__(loss=loss, logging_metrics=logging_metrics, **kwargs) self.embeddings = MultiEmbedding( embedding_sizes=self.hparams.embedding_sizes, categorical_groups=self.hparams.categorical_groups, embedding_paddings=self.hparams.embedding_paddings, x_categoricals=self.hparams.x_categoricals, ) self.model = NHiTSModule( context_length=self.hparams.context_length, prediction_length=self.hparams.prediction_length, output_size=to_list(output_size), static_size=self.static_size, encoder_covariate_size=self.encoder_covariate_size, decoder_covariate_size=self.decoder_covariate_size, static_hidden_size=self.hparams.static_hidden_size, n_blocks=self.hparams.n_blocks, n_layers=self.hparams.n_layers, hidden_size=self.n_stacks * [2 * [self.hparams.hidden_size]], pooling_sizes=self.hparams.pooling_sizes, downsample_frequencies=self.hparams.downsample_frequencies, pooling_mode=self.hparams.pooling_mode, interpolation_mode=self.hparams.interpolation_mode, dropout=self.hparams.dropout, activation=self.hparams.activation, initialization=self.hparams.initialization, batch_normalization=self.hparams.batch_normalization, shared_weights=self.hparams.shared_weights, naive_level=self.hparams.naive_level, ) @property def decoder_covariate_size(self) -> int: """Decoder covariates size. Returns: int: size of time-dependent covariates used by the decoder """ return len( set(self.hparams.time_varying_reals_decoder) - set(self.target_names) ) + sum( self.embeddings.output_size[name] for name in self.hparams.time_varying_categoricals_decoder ) @property def encoder_covariate_size(self) -> int: """Encoder covariate size. Returns: int: size of time-dependent covariates used by the encoder """ return len( set(self.hparams.time_varying_reals_encoder) - set(self.target_names) ) + sum( self.embeddings.output_size[name] for name in self.hparams.time_varying_categoricals_encoder ) @property def static_size(self) -> int: """Static covariate size. Returns: int: size of static covariates """ return len(self.hparams.static_reals) + sum( self.embeddings.output_size[name] for name in self.hparams.static_categoricals ) @property def n_stacks(self) -> int: """Number of stacks. Returns: int: number of stacks. """ return len(self.hparams.n_blocks)
[docs] def forward(self, x: dict[str, torch.Tensor]) -> dict[str, torch.Tensor]: """ Pass forward of network. Args: x (Dict[str, torch.Tensor]): input from dataloader generated from :py:class:`~pytorch_forecasting.data.timeseries.TimeSeriesDataSet`. Returns: Dict[str, torch.Tensor]: output of model """ # covariates if self.encoder_covariate_size > 0: encoder_features = self.extract_features( x, self.embeddings, period="encoder" ) encoder_x_t = torch.concat( [ encoder_features[name] for name in self.encoder_variables if name not in self.target_names ], dim=2, ) else: encoder_x_t = None if self.decoder_covariate_size > 0: decoder_features = self.extract_features( x, self.embeddings, period="decoder" ) decoder_x_t = torch.concat( [decoder_features[name] for name in self.decoder_variables], dim=2 ) else: decoder_x_t = None # statics if self.static_size > 0: x_s = torch.concat( [encoder_features[name][:, 0] for name in self.static_variables], dim=1 ) else: x_s = None # target encoder_y = x["encoder_cont"][..., self.target_positions] encoder_mask = create_mask( x["encoder_lengths"].max(), x["encoder_lengths"], inverse=True ) # run model forecast, backcast, block_forecasts, block_backcasts = self.model( encoder_y, encoder_mask, encoder_x_t, decoder_x_t, x_s ) backcast = encoder_y - backcast # create block output: detach and split by block block_backcasts = block_backcasts.detach() block_forecasts = block_forecasts.detach() if isinstance(self.hparams.output_size, (tuple, list)): forecast = forecast.split(self.hparams.output_size, dim=2) backcast = backcast.split(1, dim=2) block_backcasts = tuple( self.transform_output( block.squeeze(3).split(1, dim=2), target_scale=x["target_scale"] ) for block in block_backcasts.split(1, dim=3) ) block_forecasts = tuple( self.transform_output( block.squeeze(3).split(self.hparams.output_size, dim=2), target_scale=x["target_scale"], ) for block in block_forecasts.split(1, dim=3) ) else: block_backcasts = tuple( self.transform_output( block.squeeze(3), target_scale=x["target_scale"], loss=MultiHorizonMetric(), ) for block in block_backcasts.split(1, dim=3) ) block_forecasts = tuple( self.transform_output(block.squeeze(3), target_scale=x["target_scale"]) for block in block_forecasts.split(1, dim=3) ) return self.to_network_output( prediction=self.transform_output( forecast, target_scale=x["target_scale"] ), # (n_outputs x) n_samples x n_timesteps x output_size backcast=self.transform_output( backcast, target_scale=x["target_scale"], loss=MultiHorizonMetric() ), # (n_outputs x) n_samples x n_timesteps x 1 block_backcasts=block_backcasts, # n_blocks x (n_outputs x) n_samples x n_timesteps x 1 # noqa: E501 block_forecasts=block_forecasts, # n_blocks x (n_outputs x) n_samples x n_timesteps x output_size # noqa: E501 )
[docs] @classmethod def from_dataset(cls, dataset: TimeSeriesDataSet, **kwargs): """ Convenience function to create network from :py:class`~pytorch_forecasting.data.timeseries.TimeSeriesDataSet`. Args: dataset (TimeSeriesDataSet): dataset where sole predictor is the target. **kwargs: additional arguments to be passed to ``__init__`` method. Returns: NBeats """ # noqa: E501 # validate arguments assert not isinstance( dataset.target_normalizer, NaNLabelEncoder ), "only regression tasks are supported - target must not be categorical" assert dataset.min_encoder_length == dataset.max_encoder_length, ( "only fixed encoder length is allowed," " but min_encoder_length != max_encoder_length" ) assert dataset.max_prediction_length == dataset.min_prediction_length, ( "only fixed prediction length is allowed," " but max_prediction_length != min_prediction_length" ) assert ( dataset.randomize_length is None ), "length has to be fixed, but randomize_length is not None" assert ( not dataset.add_relative_time_idx ), "add_relative_time_idx has to be False" new_kwargs = copy(kwargs) new_kwargs.update( { "prediction_length": dataset.max_prediction_length, "context_length": dataset.max_encoder_length, } ) new_kwargs.update(cls.deduce_default_output_parameters(dataset, kwargs, MASE())) assert (new_kwargs.get("backcast_loss_ratio", 0) == 0) | ( isinstance(new_kwargs["output_size"], int) and new_kwargs["output_size"] == 1 ) or all(o == 1 for o in new_kwargs["output_size"]), ( "output sizes can only be of size 1, i.e." " point forecasts if backcast_loss_ratio > 0" ) # initialize class return super().from_dataset(dataset, **new_kwargs)
[docs] def step(self, x, y, batch_idx) -> dict[str, torch.Tensor]: """ Take training / validation step. """ log, out = super().step(x, y, batch_idx=batch_idx) if ( self.hparams.backcast_loss_ratio > 0 and not self.predicting ): # add loss from backcast backcast = out["backcast"] backcast_weight = ( self.hparams.backcast_loss_ratio * self.hparams.prediction_length / self.hparams.context_length ) backcast_weight = backcast_weight / (backcast_weight + 1) # normalize forecast_weight = 1 - backcast_weight if isinstance(self.loss, (MultiLoss, MASE)): backcast_loss = ( self.loss( backcast, (x["encoder_target"], None), encoder_target=x["decoder_target"], encoder_lengths=x["decoder_lengths"], ) * backcast_weight ) else: backcast_loss = ( self.loss(backcast, x["encoder_target"]) * backcast_weight ) label = ["val", "train"][self.training] self.log( f"{label}_backcast_loss", backcast_loss, on_epoch=True, on_step=self.training, batch_size=len(x["decoder_target"]), ) self.log( f"{label}_forecast_loss", log["loss"], on_epoch=True, on_step=self.training, batch_size=len(x["decoder_target"]), ) log["loss"] = log["loss"] * forecast_weight + backcast_loss # log interpretation self.log_interpretation(x, out, batch_idx=batch_idx) return log, out
[docs] def plot_interpretation( self, x: dict[str, torch.Tensor], output: dict[str, torch.Tensor], idx: int, ax=None, ): """ Plot interpretation. Plot two pannels: prediction and backcast vs actuals and decomposition of prediction into different block predictions which capture different frequencies. Args: x (Dict[str, torch.Tensor]): network input output (Dict[str, torch.Tensor]): network output idx (int): index of sample for which to plot the interpretation. ax (List[matplotlib axes], optional): list of two matplotlib axes onto which to plot the interpretation. Defaults to None. Returns: plt.Figure: matplotlib figure """ # noqa: E501 _check_matplotlib("plot_interpretation") from matplotlib import pyplot as plt if not isinstance(self.loss, MultiLoss): # not multi-target prediction = self.to_prediction( dict(prediction=output["prediction"][[idx]].detach()) )[0].cpu() block_forecasts = [ self.to_prediction(dict(prediction=block[[idx]].detach()))[0].cpu() for block in output["block_forecasts"] ] elif isinstance(output["prediction"], (tuple, list)): # multi-target figs = [] # predictions and block forecasts need to be converted prediction = [ p[[idx]].detach() for p in output["prediction"] ] # select index prediction = self.to_prediction( dict(prediction=prediction) ) # transform to prediction prediction = [p[0].cpu() for p in prediction] # select first and only index block_forecasts = [ self.to_prediction(dict(prediction=[b[[idx]].detach() for b in block])) for block in output["block_forecasts"] ] block_forecasts = [[b[0].cpu() for b in block] for block in block_forecasts] for i in range(len(self.target_names)): if ax is not None: ax_i = ax[i] else: ax_i = None figs.append( self.plot_interpretation( dict( encoder_target=x["encoder_target"][i], decoder_target=x["decoder_target"][i], ), dict( backcast=output["backcast"][i], prediction=prediction[i], block_backcasts=[ block[i] for block in output["block_backcasts"] ], block_forecasts=[block[i] for block in block_forecasts], ), idx=idx, ax=ax_i, ) ) return figs else: prediction = output[ "prediction" ] # multi target that has already been transformed block_forecasts = output["block_forecasts"] if ax is None: fig, ax = plt.subplots(2, 1, figsize=(6, 8), sharex=True, sharey=True) else: fig = ax[0].get_figure() # plot target vs prediction # target prop_cycle = iter(plt.rcParams["axes.prop_cycle"]) color = next(prop_cycle)["color"] ax[0].plot( torch.arange(-self.hparams.context_length, 0), x["encoder_target"][idx].detach().cpu(), c=color, ) ax[0].plot( torch.arange(self.hparams.prediction_length), x["decoder_target"][idx].detach().cpu(), label="Target", c=color, ) # prediction color = next(prop_cycle)["color"] ax[0].plot( torch.arange(-self.hparams.context_length, 0), output["backcast"][idx][..., 0].detach().cpu(), label="Backcast", c=color, ) ax[0].plot( torch.arange(self.hparams.prediction_length), prediction, label="Forecast", c=color, ) # plot blocks for pooling_size, block_backcast, block_forecast in zip( self.hparams.pooling_sizes, output["block_backcasts"][1:], block_forecasts ): color = next(prop_cycle)["color"] ax[1].plot( torch.arange(-self.hparams.context_length, 0), block_backcast[idx][..., 0].detach().cpu(), c=color, ) ax[1].plot( torch.arange(self.hparams.prediction_length), block_forecast, c=color, label=f"Pooling size: {pooling_size}", ) ax[1].set_xlabel("Time") fig.legend() return fig
[docs] def log_interpretation(self, x, out, batch_idx): """ Log interpretation of network predictions in tensorboard. """ mpl_available = _check_matplotlib("log_interpretation", raise_error=False) # Don't log figures if matplotlib or add_figure is not available if not mpl_available or not self._logger_supports("add_figure"): return None label = ["val", "train"][self.training] if self.log_interval > 0 and batch_idx % self.log_interval == 0: fig = self.plot_interpretation(x, out, idx=0) name = f"{label.capitalize()} interpretation of item 0 in " if self.training: name += f"step {self.global_step}" else: name += f"batch {batch_idx}" self.logger.experiment.add_figure(name, fig, global_step=self.global_step) if isinstance(fig, (list, tuple)): for idx, f in enumerate(fig): self.logger.experiment.add_figure( f"{self.target_names[idx]} {name}", f, global_step=self.global_step, ) else: self.logger.experiment.add_figure( name, fig, global_step=self.global_step, )