Building a new model in PyTorch Forecasting is relatively easy. Many things are taken care of automatically
Training, validation and inference is automatically handled for most models - defining the architecture and hyperparameters is sufficient
Dataloading, normalization, re-scaling etc. is provided by the TimeSeriesDataSet
Logging training progress with multiple metrics including plotting examples is automatically taken care of
Masking of entries if different time series have different lengths is automatic
However, there a couple of things to keep in mind if you want to make full use of the package. This tutorial first demonstrates how to implement a simple model and then turns to more complicated implementation scenarios.
We will answer questions such as
How to transfer an existing PyTorch implementation into PyTorch Forecasting
How to handle data loading and enable different length time series
How to define and use a custom metric
How to handle recurrent networks
How to deal with covariates
How to test new models
For demonstration purposes we will choose a simple fully connected model. It takes a timeseries of size input_size as input and outputs a new timeseries of size output_size. You can think of this input_size encoding steps and output_size decoding/prediction steps.
input_size
output_size
[1]:
import os import warnings warnings.filterwarnings("ignore") os.chdir("../../..")
[2]:
import torch from torch import nn class FullyConnectedModule(nn.Module): def __init__(self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int): super().__init__() # input layer module_list = [nn.Linear(input_size, hidden_size), nn.ReLU()] # hidden layers for _ in range(n_hidden_layers): module_list.extend([nn.Linear(hidden_size, hidden_size), nn.ReLU()]) # output layer module_list.append(nn.Linear(hidden_size, output_size)) self.sequential = nn.Sequential(*module_list) def forward(self, x: torch.Tensor) -> torch.Tensor: # x of shape: batch_size x n_timesteps_in # output of shape batch_size x n_timesteps_out return self.sequential(x) # test that network works as intended network = FullyConnectedModule(input_size=5, output_size=2, hidden_size=10, n_hidden_layers=2) x = torch.rand(20, 5) network(x).shape
torch.Size([20, 2])
The above model is not yet a PyTorch Forecasting model but it is easy to get there. As this is a simple model, we will use the BaseModel. This base class is modified LightningModule with pre-defined hooks for training and validating time series models. The BaseModelWithCovariates will be discussed later in this tutorial.
BaseModel
BaseModelWithCovariates
Either way, the main requirement is for the model to have a forward method.
forward
BaseModel.
Network forward pass.
x (Dict[str, Union[torch.Tensor, List[torch.Tensor]]]) – network input (x as returned by the dataloader). See to_dataloader() method that returns a tuple of x and y. This function expects x.
to_dataloader()
x
y
of tensors. The minimal required entries in the dictionary are (and shapes in brackets):
prediction (batch_size x n_decoder_time_steps x n_outputs or list thereof with each entry for a different target): unscaled predictions that can be fed to metric. List of tensors if multiple targets are predicted at the same time.
prediction
target_scale (batch_size x scale_size or list thereof with each entry for a different target): target scales that allow rescaling the predictions into the real space. The scale can mostly be directly taken from x, i.e. target_scale=x["target_scale"]
target_scale
target_scale=x["target_scale"]
Dict[str, Union[torch.Tensor, List[torch.Tensor]]]
[3]:
from typing import Dict from pytorch_forecasting.models import BaseModel class FullyConnectedModel(BaseModel): def __init__(self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int, **kwargs): # saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this self.save_hyperparameters() # pass additional arguments to BaseModel.__init__, mandatory call - do not skip this super().__init__(**kwargs) self.network = FullyConnectedModule( input_size=self.hparams.input_size, output_size=self.hparams.output_size, hidden_size=self.hparams.hidden_size, n_hidden_layers=self.hparams.n_hidden_layers, ) def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: # x is a batch generated based on the TimeSeriesDataset network_input = x["encoder_cont"].squeeze(-1) prediction = self.network(network_input) # We need to return a dictionary that at least contains the prediction and the target_scale. # The parameter can be directly forwarded from the input. return dict(prediction=prediction, target_scale=x["target_scale"]) model = FullyConnectedModel(input_size=5, output_size=2, hidden_size=10, n_hidden_layers=2)
This is a very basic implementation that could be readily used for training. But before we add additional features, let’s first have a look how we pass data to this model.
Instead of having to write our own dataloader (which can be rather complicated), we can leverage PyTorch Forecasting’s TimeSeriesDataSet to feed data to our model. In fact, PyTorch Forecasting expects us to use a TimeSeriesDataSet.
TimeSeriesDataSet
The data has to be in a specific format to be used by the TimeSeriesDataSet. It should be in a pandas DataFrame and have a categorical column to identify each series and a integer column to specify the time of the record.
Below, we create such a dataset with 30 different observations - 10 for 3 time series.
[4]:
import numpy as np import pandas as pd test_data = pd.DataFrame( dict( value=np.random.rand(30) - 0.5, group=np.repeat(np.arange(3), 10), time_idx=np.tile(np.arange(10), 3), ) ) test_data
Converting it to a TimeSeriesDataSet is easy:
[5]:
from pytorch_forecasting import TimeSeriesDataSet # create the dataset from the pandas dataframe dataset = TimeSeriesDataSet( test_data, group_ids=["group"], target="value", time_idx="time_idx", min_encoder_length=5, max_encoder_length=5, min_prediction_length=2, max_prediction_length=2, time_varying_unknown_reals=["value"], )
We can take a look at all the defaults and settings that were set by PyTorch Forecasting. These are all available as arguments to TimeSeriesDataSet - see its documentation for more all the details.
[6]:
dataset.get_parameters()
{'time_idx': 'time_idx', 'target': 'value', 'group_ids': ['group'], 'weight': None, 'max_encoder_length': 5, 'min_encoder_length': 5, 'min_prediction_idx': 0, 'min_prediction_length': 2, 'max_prediction_length': 2, 'static_categoricals': [], 'static_reals': [], 'time_varying_known_categoricals': [], 'time_varying_known_reals': [], 'time_varying_unknown_categoricals': [], 'time_varying_unknown_reals': ['value'], 'variable_groups': {}, 'dropout_categoricals': [], 'constant_fill_strategy': {}, 'allow_missings': False, 'lags': {}, 'add_relative_time_idx': False, 'add_target_scales': False, 'add_encoder_length': False, 'target_normalizer': GroupNormalizer(), 'categorical_encoders': {'__group_id__group': NaNLabelEncoder(), 'group': NaNLabelEncoder()}, 'scalers': {}, 'randomize_length': None, 'predict_mode': False}
Now, we take a look at the output of the dataloader. It’s x will be fed to the model’s forward method, that is why it is so important to understand it.
[7]:
# convert the dataset to a dataloader dataloader = dataset.to_dataloader(batch_size=4) # and load the first batch x, y = next(iter(dataloader)) print("x =", x) print("\ny =", y) print("\nsizes of x =") for key, value in x.items(): print(f"\t{key} = {value.size()}")
x = {'encoder_cat': tensor([], size=(4, 5, 0), dtype=torch.int64), 'encoder_cont': tensor([[[ 0.1604], [-1.5196], [ 1.5585], [ 0.2659], [ 1.5991]], [[-0.7664], [-0.9030], [ 1.3094], [ 1.4712], [ 1.0170]], [[-0.9030], [ 1.3094], [ 1.4712], [ 1.0170], [ 1.3866]], [[-1.1089], [-0.1437], [ 0.1604], [-1.5196], [ 1.5585]]]), 'encoder_target': tensor([[ 0.0543, -0.4534, 0.4769, 0.0863, 0.4892], [-0.2258, -0.2671, 0.4016, 0.4505, 0.3133], [-0.2671, 0.4016, 0.4505, 0.3133, 0.4250], [-0.3293, -0.0376, 0.0543, -0.4534, 0.4769]]), 'encoder_lengths': tensor([5, 5, 5, 5]), 'decoder_cat': tensor([], size=(4, 2, 0), dtype=torch.int64), 'decoder_cont': tensor([[[-0.6470], [-1.1744]], [[ 1.3866], [-0.5300]], [[-0.5300], [ 0.5347]], [[ 0.2659], [ 1.5991]]]), 'decoder_target': tensor([[-0.1897, -0.3491], [ 0.4250, -0.1543], [-0.1543, 0.1675], [ 0.0863, 0.4892]]), 'decoder_lengths': tensor([2, 2, 2, 2]), 'decoder_time_idx': tensor([[7, 8], [5, 6], [6, 7], [5, 6]]), 'groups': tensor([[1], [2], [2], [1]]), 'target_scale': tensor([[0.0059, 0.3022], [0.0059, 0.3022], [0.0059, 0.3022], [0.0059, 0.3022]])} y = (tensor([[-0.1897, -0.3491], [ 0.4250, -0.1543], [-0.1543, 0.1675], [ 0.0863, 0.4892]]), None) sizes of x = encoder_cat = torch.Size([4, 5, 0]) encoder_cont = torch.Size([4, 5, 1]) encoder_target = torch.Size([4, 5]) encoder_lengths = torch.Size([4]) decoder_cat = torch.Size([4, 2, 0]) decoder_cont = torch.Size([4, 2, 1]) decoder_target = torch.Size([4, 2]) decoder_lengths = torch.Size([4]) decoder_time_idx = torch.Size([4, 2]) groups = torch.Size([4, 1]) target_scale = torch.Size([4, 2])
To understand it better, we look at documentation of the to_dataloader() method:
TimeSeriesDataSet.
to_dataloader
Get dataloader from dataset.
The
train (bool, optional) – if dataloader is used for training or prediction Will shuffle and drop last batch if True. Defaults to True.
batch_size (int) – batch size for training model. Defaults to 64.
batch_sampler (Union[Sampler, str]) –
batch sampler or string. One of
”synchronized”: ensure that samples in decoder are aligned in time. Does not support missing values in dataset. This makes only sense if the underlying algorithm makes use of values aligned in time.
PyTorch Sampler instance: any PyTorch sampler, e.g. the WeightedRandomSampler()
None: samples are taken randomly from times series.
**kwargs – additional arguments to DataLoader()
DataLoader()
First entry is x, a dictionary of tensors with the entries (and shapes in brackets)
encoder_cat (batch_size x n_encoder_time_steps x n_features): long tensor of encoded categoricals for encoder
encoder_cont (batch_size x n_encoder_time_steps x n_features): float tensor of scaled continuous variables for encoder
encoder_target (batch_size x n_encoder_time_steps or list thereof with each entry for a different target): float tensor with unscaled continous target or encoded categorical target, list of tensors for multiple targets
encoder_lengths (batch_size): long tensor with lengths of the encoder time series. No entry will be greater than n_encoder_time_steps
decoder_cat (batch_size x n_decoder_time_steps x n_features): long tensor of encoded categoricals for decoder
decoder_cont (batch_size x n_decoder_time_steps x n_features): float tensor of scaled continuous variables for decoder
decoder_target (batch_size x n_decoder_time_steps or list thereof with each entry for a different target): float tensor with unscaled continous target or encoded categorical target for decoder - this corresponds to first entry of y, list of tensors for multiple targets
decoder_lengths (batch_size): long tensor with lengths of the decoder time series. No entry will be greater than n_decoder_time_steps
group_ids (batch_size x number_of_ids): encoded group ids that identify a time series in the dataset
target_scale (batch_size x scale_size or list thereof with each entry for a different target): parameters used to normalize the target. Typically these are mean and standard deviation. Is list of tensors for multiple targets.
Second entry is y, a tuple of the form (target, weight)
target
target (batch_size x n_decoder_time_steps or list thereof with each entry for a different target): unscaled (continuous) or encoded (categories) targets, list of tensors for multiple targets
weight (None or batch_size x n_decoder_time_steps): weight
DataLoader
Example
Weight by samples for training:
from torch.utils.data import WeightedRandomSampler # length of probabilties for sampler have to be equal to the length of the index probabilities = np.sqrt(1 + data.loc[dataset.index, "target"]) sampler = WeightedRandomSampler(probabilities, len(probabilities)) dataset.to_dataloader(train=True, sampler=sampler, shuffle=False)
This explains why we had to first extract the correct input in our simple FullyConnectedModel above before passing it to our FullyConnectedModule. As a reminder:
FullyConnectedModel
FullyConnectedModule
[8]:
def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: # x is a batch generated based on the TimeSeriesDataset network_input = x["encoder_cont"].squeeze(-1) prediction = self.network(network_input) # We need to return a dictionary that at least contains the prediction and the target_scale. # The parameter can be directly forwarded from the input. return dict(prediction=prediction, target_scale=x["target_scale"])
For such a simple architecture, we can ignore most of the inputs in x. You do not have to worry about moving tensors to specifc GPUs, PyTorch Lightning will take care of this for you.
Now, let’s check if our model works:
[9]:
x, y = next(iter(dataloader)) model(x)
{'prediction': tensor([[ 0.1789, -0.3471], [ 0.1626, -0.2557], [ 0.1543, -0.2441], [ 0.1247, -0.2500]], grad_fn=<AddmmBackward>), 'target_scale': tensor([[0.0059, 0.3022], [0.0059, 0.3022], [0.0059, 0.3022], [0.0059, 0.3022]])}
If you want to know to which group and time index (at the first prediction) the samples in the batch link to, you can find out by using x_to_index():
x_to_index()
[10]:
dataset.x_to_index(x)
You might have noticed that the encoder and decoder/prediction lengths (5 and 2) are already specified in the TimeSeriesDataSet and we specified them a second time when initializing the model. This might be acceptable for such a simple model but will make it hard for users to understand how to map form the dataset to the model parameters in more complicated settings. This is why we should implement another method in the model: from_dataset(). Typically, a user would always initialize a model from a dataset. The method is also an opportunity to validate that the dataset defined by the user is compatible with your model architecture.
from_dataset()
While the TimeSeriesDataSet and all PyTorch Forecasting metrics support different length time series, not every network architecture does.
[11]:
class FullyConnectedModel(BaseModel): def __init__(self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int, **kwargs): # saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this self.save_hyperparameters() # pass additional arguments to BaseModel.__init__, mandatory call - do not skip this super().__init__(**kwargs) self.network = FullyConnectedModule( input_size=self.hparams.input_size, output_size=self.hparams.output_size, hidden_size=self.hparams.hidden_size, n_hidden_layers=self.hparams.n_hidden_layers, ) def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: # x is a batch generated based on the TimeSeriesDataset network_input = x["encoder_cont"].squeeze(-1) prediction = self.network(network_input).unsqueeze(-1) # We need to return a dictionary that at least contains the prediction and the target_scale. # The parameter can be directly forwarded from the input. return dict(prediction=prediction, target_scale=x["target_scale"]) @classmethod def from_dataset(cls, dataset: TimeSeriesDataSet, **kwargs): new_kwargs = { "output_size": dataset.max_prediction_length, "input_size": dataset.max_encoder_length, } new_kwargs.update(kwargs) # use to pass real hyperparameters and override defaults set by dataset # example for dataset validation assert dataset.max_prediction_length == dataset.min_prediction_length, "Decoder only supports a fixed length" assert dataset.min_encoder_length == dataset.max_encoder_length, "Encoder only supports a fixed length" assert ( len(dataset.time_varying_known_categoricals) == 0 and len(dataset.time_varying_known_reals) == 0 and len(dataset.time_varying_unknown_categoricals) == 0 and len(dataset.static_categoricals) == 0 and len(dataset.static_reals) == 0 and len(dataset.time_varying_unknown_reals) == 1 and dataset.time_varying_unknown_reals[0] == dataset.target ), "Only covariate should be the target in 'time_varying_unknown_reals'" return super().from_dataset(dataset, **new_kwargs)
Now, let’s initialize from our dataset:
[12]:
model = FullyConnectedModel.from_dataset(dataset, hidden_size=10, n_hidden_layers=2) model.summarize("full") # print model summary model.hparams
| Name | Type | Params --------------------------------------------------------------- 0 | loss | SMAPE | 0 1 | logging_metrics | ModuleList | 0 2 | network | FullyConnectedModule | 302 3 | network.sequential | Sequential | 302 4 | network.sequential.0 | Linear | 60 5 | network.sequential.1 | ReLU | 0 6 | network.sequential.2 | Linear | 110 7 | network.sequential.3 | ReLU | 0 8 | network.sequential.4 | Linear | 110 9 | network.sequential.5 | ReLU | 0 10 | network.sequential.6 | Linear | 22 --------------------------------------------------------------- 302 Trainable params 0 Non-trainable params 302 Total params
"hidden_size": 10 "input_size": 5 "learning_rate": 0.001 "log_gradient_flow": False "log_interval": -1 "log_val_interval": -1 "logging_metrics": ModuleList() "loss": SMAPE() "monotone_constaints": {} "n_hidden_layers": 2 "optimizer": ranger "output_size": 2 "output_transformer": GroupNormalizer() "reduce_on_plateau_min_lr": 1e-05 "reduce_on_plateau_patience": 1000 "weight_decay": 0.0
So far, we have kept a wildcard **kwargs argument in the model initialization signature. We then pass these **kwargs to the BaseModel using a super().__init__(**kwargs) call. We can see which additional hyperparameters are available as they are all saved in the hparams attribute of the model:
**kwargs
super().__init__(**kwargs)
hparams
[13]:
model.hparams
While not required, to give the user transparancy over these additional hyperparameters, it is worth passing them explicitly instead of implicitly in **kwargs
They are described in detail in the BaseModel.
__init__
BaseModel for timeseries forecasting from which to inherit from
log_interval (Union[int, float], optional) – Batches after which predictions are logged. If < 1.0, will log multiple entries per batch. Defaults to -1.
log_val_interval (Union[int, float], optional) – batches after which predictions for validation are logged. Defaults to None/log_interval.
learning_rate (float, optional) – Learning rate. Defaults to 1e-3.
log_gradient_flow (bool) – If to log gradient flow, this takes time and should be only done to diagnose training failures. Defaults to False.
loss (Metric, optional) – metric to optimize. Defaults to SMAPE().
logging_metrics (nn.ModuleList[MultiHorizonMetric]) – list of metrics that are logged during training. Defaults to [].
reduce_on_plateau_patience (int) – patience after which learning rate is reduced by a factor of 10. Defaults to 1000
reduce_on_plateau_min_lr (float) – minimum learning rate for reduce on plateua learning rate scheduler. Defaults to 1e-5
weight_decay (float) – weight decay. Defaults to 0.0.
optimizer_params (Dict[str, Any]) – additional parameters for the optimizer. Defaults to {}.
monotone_constaints (Dict[str, int]) – dictionary of monotonicity constraints for continuous decoder variables mapping position (e.g. "0" for first position) to constraint (-1 for negative and +1 for positive, larger numbers add more weight to the constraint vs. the loss but are usually not necessary). This constraint significantly slows down training. Defaults to {}.
"0"
-1
+1
output_transformer (Callable) – transformer that takes network output and transforms it to prediction space. Defaults to None which is equivalent to lambda out: out["prediction"].
lambda out: out["prediction"]
optimizer (str) – Optimizer, “ranger”, “sgd”, “adam”, “adamw” or class name of optimizer in torch.optim. Defaults to “ranger”.
torch.optim
You can simply copy this docstring into your model implementation:
[14]:
print(BaseModel.__init__.__doc__)
BaseModel for timeseries forecasting from which to inherit from Args: log_interval (Union[int, float], optional): Batches after which predictions are logged. If < 1.0, will log multiple entries per batch. Defaults to -1. log_val_interval (Union[int, float], optional): batches after which predictions for validation are logged. Defaults to None/log_interval. learning_rate (float, optional): Learning rate. Defaults to 1e-3. log_gradient_flow (bool): If to log gradient flow, this takes time and should be only done to diagnose training failures. Defaults to False. loss (Metric, optional): metric to optimize. Defaults to SMAPE(). logging_metrics (nn.ModuleList[MultiHorizonMetric]): list of metrics that are logged during training. Defaults to []. reduce_on_plateau_patience (int): patience after which learning rate is reduced by a factor of 10. Defaults to 1000 reduce_on_plateau_min_lr (float): minimum learning rate for reduce on plateua learning rate scheduler. Defaults to 1e-5 weight_decay (float): weight decay. Defaults to 0.0. monotone_constaints (Dict[str, int]): dictionary of monotonicity constraints for continuous decoder variables mapping position (e.g. ``"0"`` for first position) to constraint (``-1`` for negative and ``+1`` for positive, larger numbers add more weight to the constraint vs. the loss but are usually not necessary). This constraint significantly slows down training. Defaults to {}. output_transformer (Callable): transformer that takes network output and transforms it to prediction space. Defaults to None which is equivalent to ``lambda out: out["prediction"]``. optimizer (str): Optimizer, "ranger", "adam" or "adamw". Defaults to "ranger".
Classification is a common task and can be easily implemented. In fact, we only have to change the target in our TimeSeriesDataSet and adjust the number of prediction outputs to reflect the number of classes we want to predict. The changes for the TimeSeriesDataSet are marked below.
[15]:
classification_test_data = pd.DataFrame( dict( target=np.random.choice(["A", "B", "C"], size=30), # CHANGING values to predict to a categorical value=np.random.rand(30), # INPUT values - see next section on covariates how to use categorical inputs group=np.repeat(np.arange(3), 10), time_idx=np.tile(np.arange(10), 3), ) ) classification_test_data
[16]:
from pytorch_forecasting.data.encoders import NaNLabelEncoder # create the dataset from the pandas dataframe classification_dataset = TimeSeriesDataSet( classification_test_data, group_ids=["group"], target="target", # SWITCHING to categorical target time_idx="time_idx", min_encoder_length=5, max_encoder_length=5, min_prediction_length=2, max_prediction_length=2, time_varying_unknown_reals=["value"], target_normalizer=NaNLabelEncoder(), # Use the NaNLabelEncoder to encode categorical target ) x, y = next(iter(classification_dataset.to_dataloader(batch_size=4))) y[0] # target values are encoded categories
tensor([[0, 1], [2, 2], [1, 0], [2, 1]])
Now, we need to modify our implementation of the FullyConnectedModel. In particular, we have to one hyperparameters to the model: n_classes which determines how many classes there are to predict. Our model will produce a number for each class at each timestep each of which can be converted into probabilities by applying a softmax (over the last dimension). This means we need a total of n_decoder_timesteps x n_classes predictions. Further, we need to specify the default loss function which we choose to be CrossEntropy.
n_classes
n_decoder_timesteps x n_classes
CrossEntropy
[17]:
from pytorch_forecasting.metrics import CrossEntropy class FullyConnectedClassificationModel(BaseModel): def __init__( self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int, n_classes: int, loss=CrossEntropy(), **kwargs, ): # saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this self.save_hyperparameters() # pass additional arguments to BaseModel.__init__, mandatory call - do not skip this super().__init__(**kwargs) self.network = FullyConnectedModule( input_size=self.hparams.input_size, output_size=self.hparams.output_size * self.hparams.n_classes, hidden_size=self.hparams.hidden_size, n_hidden_layers=self.hparams.n_hidden_layers, ) def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: # x is a batch generated based on the TimeSeriesDataset batch_size = x["encoder_cont"].size(0) network_input = x["encoder_cont"].squeeze(-1) prediction = self.network(network_input) # RESHAPE output to batch_size x n_decoder_timesteps x n_classes prediction = prediction.unsqueeze(-1).view(batch_size, -1, self.hparams.n_classes) # We need to return a dictionary that at least contains the prediction and the target_scale. # The parameter can be directly forwarded from the input. return dict(prediction=prediction, target_scale=x["target_scale"]) @classmethod def from_dataset(cls, dataset: TimeSeriesDataSet, **kwargs): assert isinstance(dataset.target_normalizer, NaNLabelEncoder), "target normalizer has to encode categories" new_kwargs = { "n_classes": len( dataset.target_normalizer.classes_ ), # ADD number of classes as encoded by the target normalizer "output_size": dataset.max_prediction_length, "input_size": dataset.max_encoder_length, } new_kwargs.update(kwargs) # use to pass real hyperparameters and override defaults set by dataset # example for dataset validation assert dataset.max_prediction_length == dataset.min_prediction_length, "Decoder only supports a fixed length" assert dataset.min_encoder_length == dataset.max_encoder_length, "Encoder only supports a fixed length" assert ( len(dataset.time_varying_known_categoricals) == 0 and len(dataset.time_varying_known_reals) == 0 and len(dataset.time_varying_unknown_categoricals) == 0 and len(dataset.static_categoricals) == 0 and len(dataset.static_reals) == 0 and len(dataset.time_varying_unknown_reals) == 1 ), "Only covariate should be in 'time_varying_unknown_reals'" return super().from_dataset(dataset, **new_kwargs) model = FullyConnectedClassificationModel.from_dataset(classification_dataset, hidden_size=10, n_hidden_layers=2) model.summarize("full") model.hparams
| Name | Type | Params --------------------------------------------------------------- 0 | loss | SMAPE | 0 1 | logging_metrics | ModuleList | 0 2 | network | FullyConnectedModule | 346 3 | network.sequential | Sequential | 346 4 | network.sequential.0 | Linear | 60 5 | network.sequential.1 | ReLU | 0 6 | network.sequential.2 | Linear | 110 7 | network.sequential.3 | ReLU | 0 8 | network.sequential.4 | Linear | 110 9 | network.sequential.5 | ReLU | 0 10 | network.sequential.6 | Linear | 66 --------------------------------------------------------------- 346 Trainable params 0 Non-trainable params 346 Total params
"hidden_size": 10 "input_size": 5 "learning_rate": 0.001 "log_gradient_flow": False "log_interval": -1 "log_val_interval": -1 "logging_metrics": ModuleList() "loss": CrossEntropy() "monotone_constaints": {} "n_classes": 3 "n_hidden_layers": 2 "optimizer": ranger "output_size": 2 "output_transformer": NaNLabelEncoder() "reduce_on_plateau_min_lr": 1e-05 "reduce_on_plateau_patience": 1000 "weight_decay": 0.0
[18]:
# passing x through model model(x)["prediction"].shape
torch.Size([4, 2, 3])
Training a model to predict multiple targets simulateneously is not difficult to implement. We can even employ mixed targets, i.e. a mix of categorical and continous targets. The first step is to use define a dataframe with multiple targets:
[19]:
multi_target_test_data = pd.DataFrame( dict( target1=np.random.rand(30), target2=np.random.rand(30), group=np.repeat(np.arange(3), 10), time_idx=np.tile(np.arange(10), 3), ) ) multi_target_test_data
We can then simply pass a list to target keyword of the TimeSeriesDataSet. The class will choose reasonable defaults for normalizing the targets but we can also specify the normalizer explicitly by assigning an instance of :py:class`~pytorch_forecasting.data.encoders.MultiNormalizer` to the target_normalizer keyword - for fun, lets use different ways of normalization.
target_normalizer
[20]:
from pytorch_forecasting.data.encoders import EncoderNormalizer, MultiNormalizer, TorchNormalizer # create the dataset from the pandas dataframe multi_target_dataset = TimeSeriesDataSet( multi_target_test_data, group_ids=["group"], target=["target1", "target2"], # USING two targets time_idx="time_idx", min_encoder_length=5, max_encoder_length=5, min_prediction_length=2, max_prediction_length=2, time_varying_unknown_reals=["target1", "target2"], target_normalizer=MultiNormalizer( [EncoderNormalizer(), TorchNormalizer()] ), # Use the NaNLabelEncoder to encode categorical target ) x, y = next(iter(multi_target_dataset.to_dataloader(batch_size=4))) y[0] # target values are a list of targets
[tensor([[0.2117, 0.0968], [0.0968, 0.1795], [0.4804, 0.2674], [0.6455, 0.9711]]), tensor([[0.0974, 0.7009], [0.7009, 0.6088], [0.5579, 0.0058], [0.0175, 0.2604]])]
Using multiple targets leads to a slightly different x and y of the TimeSeriesDataSet’s dataloader. y is still a tuple of target and weight but the target is now a list of tensors. So is the target_scale, the encoder_target and the decoder_target in x.
encoder_target
decoder_target
For this reason not every model is automatically suited to deal with multiple targets. However, it is (very often) fairly simple to extend a model to output a list of tensors (for each target) as opposed to just one tensor (for one target). We will now modify our FullyConnectedModel to work with one or more targets.
As we use multiple targets, we need to define a loss function that can handle them. The MultiLoss is exactly built for that purpose. It also allows weighing the losses differently. Soley for demonstration purposes, we decide to optimize the mean absolute error for the first and the symmetric mean average percentage error for the second target. We weight the error on the first target double as high as the error on the second target.
MultiLoss
[21]:
from typing import List, Union from pytorch_forecasting.metrics import MAE, SMAPE, MultiLoss from pytorch_forecasting.utils import to_list class FullyConnectedMultiTargetModel(BaseModel): def __init__( self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int, target_sizes: Union[int, List[int]] = [], **kwargs, ): # saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this self.save_hyperparameters() # pass additional arguments to BaseModel.__init__, mandatory call - do not skip this super().__init__(**kwargs) self.network = FullyConnectedModule( input_size=self.hparams.input_size * len(to_list(self.hparams.target_sizes)), output_size=self.hparams.output_size * sum(to_list(self.hparams.target_sizes)), hidden_size=self.hparams.hidden_size, n_hidden_layers=self.hparams.n_hidden_layers, ) def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: # x is a batch generated based on the TimeSeriesDataset batch_size = x["encoder_cont"].size(0) network_input = x["encoder_cont"].view(batch_size, -1) prediction = self.network(network_input) # RESHAPE output to batch_size x n_decoder_timesteps x sum_of_target_sizes prediction = prediction.unsqueeze(-1).view(batch_size, self.hparams.output_size, sum(self.hparams.target_sizes)) # RESHAPE into list of batch_size x n_decoder_timesteps x target_sizes[i] where i=1..len(target_sizes) stops = np.cumsum(self.hparams.target_sizes) starts = stops - self.hparams.target_sizes prediction = [prediction[..., start:stop] for start, stop in zip(starts, stops)] if isinstance(self.hparams.target_sizes, int): # only one target prediction = prediction[0] # We need to return a dictionary that at least contains the prediction and the target_scale. # The parameter can be directly forwarded from the input. return dict(prediction=prediction, target_scale=x["target_scale"]) @classmethod def from_dataset(cls, dataset: TimeSeriesDataSet, **kwargs): # By default only handle targets of size one here, categorical targets would be of larger size new_kwargs = { "target_sizes": [1] * len(to_list(dataset.target)), "output_size": dataset.max_prediction_length, "input_size": dataset.max_encoder_length, } new_kwargs.update(kwargs) # use to pass real hyperparameters and override defaults set by dataset # example for dataset validation assert dataset.max_prediction_length == dataset.min_prediction_length, "Decoder only supports a fixed length" assert dataset.min_encoder_length == dataset.max_encoder_length, "Encoder only supports a fixed length" assert ( len(dataset.time_varying_known_categoricals) == 0 and len(dataset.time_varying_known_reals) == 0 and len(dataset.time_varying_unknown_categoricals) == 0 and len(dataset.static_categoricals) == 0 and len(dataset.static_reals) == 0 and len(dataset.time_varying_unknown_reals) == len(dataset.target_names) # Expect as as many unknown reals as targets ), "Only covariate should be in 'time_varying_unknown_reals'" return super().from_dataset(dataset, **new_kwargs) model = FullyConnectedMultiTargetModel.from_dataset( multi_target_dataset, hidden_size=10, n_hidden_layers=2, loss=MultiLoss(metrics=[MAE(), SMAPE()], weights=[2.0, 1.0]), ) model.summarize("full") model.hparams
| Name | Type | Params --------------------------------------------------------------- 0 | loss | MultiLoss | 0 1 | logging_metrics | ModuleList | 0 2 | network | FullyConnectedModule | 374 3 | network.sequential | Sequential | 374 4 | network.sequential.0 | Linear | 110 5 | network.sequential.1 | ReLU | 0 6 | network.sequential.2 | Linear | 110 7 | network.sequential.3 | ReLU | 0 8 | network.sequential.4 | Linear | 110 9 | network.sequential.5 | ReLU | 0 10 | network.sequential.6 | Linear | 44 --------------------------------------------------------------- 374 Trainable params 0 Non-trainable params 374 Total params
"hidden_size": 10 "input_size": 5 "learning_rate": 0.001 "log_gradient_flow": False "log_interval": -1 "log_val_interval": -1 "logging_metrics": ModuleList() "loss": MultiLoss(2 * MAE(), SMAPE()) "monotone_constaints": {} "n_hidden_layers": 2 "optimizer": ranger "output_size": 2 "output_transformer": MultiNormalizer(normalizers=[EncoderNormalizer(), TorchNormalizer()]) "reduce_on_plateau_min_lr": 1e-05 "reduce_on_plateau_patience": 1000 "target_sizes": [1, 1] "weight_decay": 0.0
Now, let’s pass some data through our model and calculate the loss.
[22]:
out = model(x) out
{'prediction': [tensor([[[0.1955], [0.0778]], [[0.2524], [0.1098]], [[0.3482], [0.1651]], [[0.2962], [0.1373]]], grad_fn=<SliceBackward>), tensor([[[ 0.1001], [-0.2182]], [[ 0.1025], [-0.2187]], [[ 0.1026], [-0.2329]], [[ 0.1262], [-0.1787]]], grad_fn=<SliceBackward>)], 'target_scale': [tensor([[0.6647, 0.2287], [0.5654, 0.3014], [0.6255, 0.2756], [0.5058, 0.2519]]), tensor([[0.4946, 0.3242], [0.4946, 0.3242], [0.4946, 0.3242], [0.4946, 0.3242]], dtype=torch.float64)]}
[23]:
y_hat = model.transform_output( out ) # the model's transform_output method re-scales/de-normalizes the predictions to into the real target space model.loss(y_hat, y)
tensor(1.9705, grad_fn=<SumBackward1>)
Now that we have established the basics, we can move on to more advanced use cases, e.g. how can we make use of covariates - static and continuous alike. We can leverage the BaseModelWithCovariates for this. The difference to the BaseModel is a from_dataset() method that pre-defines hyperparameters for architectures with covariates.
pytorch_forecasting.models.base_model.
Model with additional methods using covariates.
Assumes the following hyperparameters:
static_categoricals (List[str]) – names of static categorical variables
static_reals (List[str]) – names of static continuous variables
time_varying_categoricals_encoder (List[str]) – names of categorical variables for encoder
time_varying_categoricals_decoder (List[str]) – names of categorical variables for decoder
time_varying_reals_encoder (List[str]) – names of continuous variables for encoder
time_varying_reals_decoder (List[str]) – names of continuous variables for decoder
x_reals (List[str]) – order of continuous variables in tensor passed to forward function
x_categoricals (List[str]) – order of categorical variables in tensor passed to forward function
embedding_sizes (Dict[str, Tuple[int, int]]) – dictionary mapping categorical variables to tuple of integers where the first integer denotes the number of categorical classes and the second the embedding size
embedding_labels (Dict[str, List[str]]) – dictionary mapping (string) indices to list of categorical labels
embedding_paddings (List[str]) – names of categorical variables for which label 0 is always mapped to an embedding vector filled with zeros
categorical_groups (Dict[str, List[str]]) – dictionary of categorical variables that are grouped together and can also take multiple values simultaneously (e.g. holiday during octoberfest). They should be implemented as bag of embeddings
from_dataset
Create model from dataset and set parameters related to covariates.
dataset – timeseries dataset
allowed_encoder_known_variable_names – List of known variables that are allowed in encoder, defaults to all
**kwargs – additional arguments such as hyperparameters for model (see __init__())
__init__()
LightningModule
Here is a from the BaseModelWithCovariates docstring to copy:
[24]:
from pytorch_forecasting.models.base_model import BaseModelWithCovariates print(BaseModelWithCovariates.__doc__)
Model with additional methods using covariates. Assumes the following hyperparameters: Args: static_categoricals (List[str]): names of static categorical variables static_reals (List[str]): names of static continuous variables time_varying_categoricals_encoder (List[str]): names of categorical variables for encoder time_varying_categoricals_decoder (List[str]): names of categorical variables for decoder time_varying_reals_encoder (List[str]): names of continuous variables for encoder time_varying_reals_decoder (List[str]): names of continuous variables for decoder x_reals (List[str]): order of continuous variables in tensor passed to forward function x_categoricals (List[str]): order of categorical variables in tensor passed to forward function embedding_sizes (Dict[str, Tuple[int, int]]): dictionary mapping categorical variables to tuple of integers where the first integer denotes the number of categorical classes and the second the embedding size embedding_labels (Dict[str, List[str]]): dictionary mapping (string) indices to list of categorical labels embedding_paddings (List[str]): names of categorical variables for which label 0 is always mapped to an embedding vector filled with zeros categorical_groups (Dict[str, List[str]]): dictionary of categorical variables that are grouped together and can also take multiple values simultaneously (e.g. holiday during octoberfest). They should be implemented as bag of embeddings
We will now implement the model. A helpful module is the MultiEmbedding which can be used to embed categorical features. It is compliant with he TimeSeriesDataSet, i.e. it supports bags of embeddings that are useful for embeddings where multiple categories can occur at the same time such holidays. Again, we will create a fully-connected network. It is easy to recycle our FullyConnectedModule by simply replacing setting input_size to the number of encoder time steps times the number of features instead of simply the number of encoder time steps.
MultiEmbedding
[25]:
from typing import Dict, List, Tuple from pytorch_forecasting.models.nn import MultiEmbedding class FullyConnectedModelWithCovariates(BaseModelWithCovariates): def __init__( self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int, x_reals: List[str], x_categoricals: List[str], embedding_sizes: Dict[str, Tuple[int, int]], embedding_labels: Dict[str, List[str]], static_categoricals: List[str], static_reals: List[str], time_varying_categoricals_encoder: List[str], time_varying_categoricals_decoder: List[str], time_varying_reals_encoder: List[str], time_varying_reals_decoder: List[str], embedding_paddings: List[str], categorical_groups: Dict[str, List[str]], **kwargs, ): # saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this self.save_hyperparameters() # pass additional arguments to BaseModel.__init__, mandatory call - do not skip this super().__init__(**kwargs) # create embedder - can be fed with x["encoder_cat"] or x["decoder_cat"] and will return # dictionary of category names mapped to embeddings self.input_embeddings = MultiEmbedding( embedding_sizes=self.hparams.embedding_sizes, categorical_groups=self.hparams.categorical_groups, embedding_paddings=self.hparams.embedding_paddings, x_categoricals=self.hparams.x_categoricals, max_embedding_size=self.hparams.hidden_size, ) # calculate the size of all concatenated embeddings + continous variables n_features = sum( embedding_size for classes_size, embedding_size in self.hparams.embedding_sizes.values() ) + len(self.reals) # create network that will be fed with continious variables and embeddings self.network = FullyConnectedModule( input_size=self.hparams.input_size * n_features, output_size=self.hparams.output_size, hidden_size=self.hparams.hidden_size, n_hidden_layers=self.hparams.n_hidden_layers, ) def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: # x is a batch generated based on the TimeSeriesDataset batch_size = x["encoder_lengths"].size(0) embeddings = self.input_embeddings(x["encoder_cat"]) # returns dictionary with embedding tensors network_input = torch.cat( [x["encoder_cont"]] + [ emb for name, emb in embeddings.items() if name in self.encoder_variables or name in self.static_variables ], dim=-1, ) prediction = self.network(network_input.view(batch_size, -1)) # We need to return a dictionary that at least contains the prediction and the target_scale. # The parameter can be directly forwarded from the input. return dict(prediction=prediction, target_scale=x["target_scale"]) @classmethod def from_dataset(cls, dataset: TimeSeriesDataSet, **kwargs): new_kwargs = { "output_size": dataset.max_prediction_length, "input_size": dataset.max_encoder_length, } new_kwargs.update(kwargs) # use to pass real hyperparameters and override defaults set by dataset # example for dataset validation assert dataset.max_prediction_length == dataset.min_prediction_length, "Decoder only supports a fixed length" assert dataset.min_encoder_length == dataset.max_encoder_length, "Encoder only supports a fixed length" return super().from_dataset(dataset, **new_kwargs)
We have used here additional hooks available through the BaseModelWithCovariates such as self.static_variables or self.encoder_variables that can be readily determined from the hyperparameters. See the documentation of the BaseModelWithCovariates class for all available additions to the BaseModel.
self.static_variables
self.encoder_variables
When the model receives its input x, you can use the hyperparameters and linked to variables and the additional variables by the BaseModelWithCovariates to identify the different variables. This is important as x["encoder_cat"].size(2) == x["decoder_cat"].size(2) and x["encoder_cont"].size(2) == x["decoder_cont"].size(2). This means all variables are passed to the encoder and decoder even if some are not allowed to be used by the decoder as they are not known in the future. The order of variables in x["encoder_cont"] / x["decoder_cont"] and x["encoder_cat"] / x["decoder_cat"]``is determined by the hyperparameters ``x_reals and x_categoricals. Consequently, you can idenify, for example, the position of all continuous decoder variables with [self.hparams.x_reals.index(name) for name in self.hparams.time_varying_reals_decoder].
x["encoder_cat"].size(2) == x["decoder_cat"].size(2)
x["encoder_cont"].size(2) == x["decoder_cont"].size(2)
x["encoder_cont"]
x["decoder_cont"]
x["encoder_cat"]
x["decoder_cat"]``is determined by the hyperparameters ``x_reals
x_categoricals
[self.hparams.x_reals.index(name) for name in self.hparams.time_varying_reals_decoder]
Note that the model does not make use of the known covariates in the decoder - this is obviously suboptimal but not scope of this tutorial. Anyways, let us create a new dataset with categorical variables and see how the model can be instantiated from it.
[26]:
import numpy as np import pandas as pd from pytorch_forecasting import TimeSeriesDataSet test_data_with_covariates = pd.DataFrame( dict( # as before value=np.random.rand(30), group=np.repeat(np.arange(3), 10), time_idx=np.tile(np.arange(10), 3), # now adding covariates categorical_covariate=np.random.choice(["a", "b"], size=30), real_covariate=np.random.rand(30), ) ).astype( dict(group=str) ) # categorical covariates have to be of string type test_data_with_covariates
[27]:
# create the dataset from the pandas dataframe dataset_with_covariates = TimeSeriesDataSet( test_data_with_covariates, group_ids=["group"], target="value", time_idx="time_idx", min_encoder_length=5, max_encoder_length=5, min_prediction_length=2, max_prediction_length=2, time_varying_unknown_reals=["value"], time_varying_known_reals=["real_covariate"], time_varying_known_categoricals=["categorical_covariate"], static_categoricals=["group"], ) model = FullyConnectedModelWithCovariates.from_dataset(dataset_with_covariates, hidden_size=10, n_hidden_layers=2) model.summarize("full") # print model summary model.hparams
| Name | Type | Params -------------------------------------------------------------------------------------------- 0 | loss | SMAPE | 0 1 | logging_metrics | ModuleList | 0 2 | input_embeddings | MultiEmbedding | 11 3 | input_embeddings.embeddings | ModuleDict | 11 4 | input_embeddings.embeddings.group | Embedding | 9 5 | input_embeddings.embeddings.categorical_covariate | Embedding | 2 6 | network | FullyConnectedModule | 552 7 | network.sequential | Sequential | 552 8 | network.sequential.0 | Linear | 310 9 | network.sequential.1 | ReLU | 0 10 | network.sequential.2 | Linear | 110 11 | network.sequential.3 | ReLU | 0 12 | network.sequential.4 | Linear | 110 13 | network.sequential.5 | ReLU | 0 14 | network.sequential.6 | Linear | 22 -------------------------------------------------------------------------------------------- 563 Trainable params 0 Non-trainable params 563 Total params
"categorical_groups": {} "embedding_labels": {'group': {'0': 0, '1': 1, '2': 2}, 'categorical_covariate': {'a': 0, 'b': 1}} "embedding_paddings": [] "embedding_sizes": {'group': [3, 3], 'categorical_covariate': [2, 1]} "hidden_size": 10 "input_size": 5 "learning_rate": 0.001 "log_gradient_flow": False "log_interval": -1 "log_val_interval": -1 "logging_metrics": ModuleList() "loss": SMAPE() "monotone_constaints": {} "n_hidden_layers": 2 "optimizer": ranger "output_size": 2 "output_transformer": GroupNormalizer(transformation='relu') "reduce_on_plateau_min_lr": 1e-05 "reduce_on_plateau_patience": 1000 "static_categoricals": ['group'] "static_reals": [] "time_varying_categoricals_decoder": ['categorical_covariate'] "time_varying_categoricals_encoder": ['categorical_covariate'] "time_varying_reals_decoder": ['real_covariate'] "time_varying_reals_encoder": ['real_covariate', 'value'] "weight_decay": 0.0 "x_categoricals": ['group', 'categorical_covariate'] "x_reals": ['real_covariate', 'value']
To test that the model could be trained, pass a sample batch.
[28]:
x, y = next(iter(dataset_with_covariates.to_dataloader(batch_size=4))) # generate batch model(x) # pass batch through model
{'prediction': tensor([[-0.2959, -0.2556], [-0.2927, -0.2079], [-0.2907, -0.2054], [-0.2500, -0.1875]], grad_fn=<AddmmBackward>), 'target_scale': tensor([[0.5754, 0.2828], [0.5754, 0.2828], [0.5754, 0.2828], [0.5754, 0.2828]])}
Often time series models are autoregressive, i.e. one does not make n predictions for all future steps in one function call but predicts n times one step ahead. PyTorch Forecasting comes with a AutoRegressiveBaseModel and a AutoRegressiveBaseModelWithCovariates for such models.
n
AutoRegressiveBaseModel
AutoRegressiveBaseModelWithCovariates
Model with additional methods for autoregressive models.
Adds in particular the decode_autoregressive() method for making auto-regressive predictions.
decode_autoregressive()
target (str) – name of target variable
target_lags (Dict[str, Dict[str, int]]) – dictionary of target names mapped each to a dictionary of corresponding lagged variables and their lags. Lags can be useful to indicate seasonality to the models. If you know the seasonalit(ies) of your data, add at least the target variables with the corresponding lags to improve performance. Defaults to no lags, i.e. an empty dictionary.
In this section, we will implement a simple LSTM model that could be easily extended to work with covariates. Note that because we do not handle covariates, lagged targets cannot be incorporated in this network. We use an implementation of the :py:class`~pytroch_forecasting.models.nn.rnn.LSTM` that can handle zero-length sequences but otherwise 100% mirrors the PyTorch-native implementation.
[29]:
from torch.nn.utils import rnn from pytorch_forecasting.models.base_model import AutoRegressiveBaseModel from pytorch_forecasting.models.nn import LSTM class LSTMModel(AutoRegressiveBaseModel): def __init__( self, target: str, target_lags: Dict[str, Dict[str, int]], n_layers: int, hidden_size: int, dropout: float = 0.1, **kwargs, ): # arguments target and target_lags are required for autoregressive models # even though target_lags cannot be used without covariates # saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this self.save_hyperparameters() # pass additional arguments to BaseModel.__init__, mandatory call - do not skip this super().__init__(**kwargs) # use version of LSTM that can handle zero-length sequences self.lstm = LSTM( hidden_size=self.hparams.hidden_size, input_size=1, num_layers=self.hparams.n_layers, dropout=self.hparams.dropout, batch_first=True, ) self.output_layer = nn.Linear(self.hparams.hidden_size, 1) def encode(self, x: Dict[str, torch.Tensor]): # we need at least one encoding step as because the target needs to be lagged by one time step # because we use the custom LSTM, we do not have to require encoder lengths of > 1 # but can handle lengths of >= 1 assert x["encoder_lengths"].min() >= 1 input_vector = x["encoder_cont"].clone() # lag target by one input_vector[..., self.target_positions] = torch.roll( input_vector[..., self.target_positions], shifts=1, dims=1 ) input_vector = input_vector[:, 1:] # first time step cannot be used because of lagging # determine effective encoder_length length effective_encoder_lengths = x["encoder_lengths"] - 1 # run through LSTM network _, hidden_state = self.lstm( input_vector, lengths=effective_encoder_lengths, enforce_sorted=False # passing the lengths directly ) # second ouput is not needed (hidden state) return hidden_state def decode(self, x: Dict[str, torch.Tensor], hidden_state): # again lag target by one input_vector = x["decoder_cont"].clone() input_vector[..., self.target_positions] = torch.roll( input_vector[..., self.target_positions], shifts=1, dims=1 ) # but this time fill in missing target from encoder_cont at the first time step instead of throwing it away last_encoder_target = x["encoder_cont"][ torch.arange(x["encoder_cont"].size(0), device=x["encoder_cont"].device), x["encoder_lengths"] - 1, self.target_positions.unsqueeze(-1), ].T input_vector[:, 0, self.target_positions] = last_encoder_target if self.training: # training mode lstm_output, _ = self.lstm(input_vector, hidden_state, lengths=x["decoder_lengths"], enforce_sorted=False) # transform into right shape prediction = self.output_layer(lstm_output) # predictions are not yet rescaled return dict(prediction=prediction, target_scale=x["target_scale"]) else: # prediction mode target_pos = self.target_positions def decode_one(idx, lagged_targets, hidden_state): x = input_vector[:, [idx]] # overwrite at target positions x[:, 0, target_pos] = lagged_targets[-1] # take most recent target (i.e. lag=1) lstm_output, hidden_state = self.lstm(x, hidden_state) # transform into right shape prediction = self.output_layer(lstm_output)[:, 0] # take first timestep return prediction, hidden_state # make predictions which are fed into next step output = self.decode_autoregressive( decode_one, first_target=input_vector[:, 0, target_pos], first_hidden_state=hidden_state, target_scale=x["target_scale"], n_decoder_steps=input_vector.size(1), ) # predictions are already rescaled return dict(prediction=output, output_transformation=None, target_scale=x["target_scale"]) def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: hidden_state = self.encode(x) # encode to hidden state output = self.decode(x, hidden_state) # decode leveraging hidden state return output model = LSTMModel.from_dataset(dataset, n_layers=2, hidden_size=10) model.summarize("full") model.hparams
| Name | Type | Params ----------------------------------------------- 0 | loss | SMAPE | 0 1 | logging_metrics | ModuleList | 0 2 | lstm | LSTM | 1.4 K 3 | output_layer | Linear | 11 ----------------------------------------------- 1.4 K Trainable params 0 Non-trainable params 1.4 K Total params
"dropout": 0.1 "hidden_size": 10 "learning_rate": 0.001 "log_gradient_flow": False "log_interval": -1 "log_val_interval": -1 "logging_metrics": ModuleList() "loss": SMAPE() "monotone_constaints": {} "n_layers": 2 "optimizer": ranger "output_transformer": GroupNormalizer() "reduce_on_plateau_min_lr": 1e-05 "reduce_on_plateau_patience": 1000 "target": value "target_lags": {} "weight_decay": 0.0
We used the transform_output() method to apply the inverse transformation. It is also used under the hood for re-scaling/de-normalizing predictions and leverages the output_transformer to do so. The output_transformer is the target_normalizer as used in the dataset. When initializing the model from the dataset, it is automatically copied to the model.
transform_output()
output_transformer
We can now check that both approaches deliver the same result in terms of prediction shape:
[30]:
x, y = next(iter(dataloader)) print( "prediction shape in training:", model(x)["prediction"].size() ) # batch_size x decoder time steps x 1 (1 for one target dimension) model.eval() # set model into eval mode to use autoregressive prediction print("prediction shape in inference:", model(x)["prediction"].size()) # should be the same as in training
prediction shape in training: torch.Size([4, 2, 1]) prediction shape in inference: torch.Size([4, 2, 1])
To use a different metric, simply pass it to the model when initializing it (preferably via the from_dataset() method). For example, to use mean absolute error with our FullyConnectedModel from the beginning of this tutorial, type
[31]:
from pytorch_forecasting.metrics import MAE model = FullyConnectedModel.from_dataset(dataset, hidden_size=10, n_hidden_layers=2, loss=MAE()) model.hparams
"hidden_size": 10 "input_size": 5 "learning_rate": 0.001 "log_gradient_flow": False "log_interval": -1 "log_val_interval": -1 "logging_metrics": ModuleList() "loss": MAE() "monotone_constaints": {} "n_hidden_layers": 2 "optimizer": ranger "output_size": 2 "output_transformer": GroupNormalizer() "reduce_on_plateau_min_lr": 1e-05 "reduce_on_plateau_patience": 1000 "weight_decay": 0.0
Note that some metrics might require a certain form of model prediction, e.g. quantile prediction assumes an output of shape batch_size x n_decoder_timesteps x n_quantiles instead of batch_size x n_decoder_timesteps. For the FullyConnectedModel, this means that we need to use a modified FullyConnectedModulenetwork. Here n_outputs corresponds to the number of quantiles.
batch_size x n_decoder_timesteps x n_quantiles
batch_size x n_decoder_timesteps
n_outputs
[32]:
import torch from torch import nn class FullyConnectedMultiOutputModule(nn.Module): def __init__(self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int, n_outputs: int): super().__init__() # input layer module_list = [nn.Linear(input_size, hidden_size), nn.ReLU()] # hidden layers for _ in range(n_hidden_layers): module_list.extend([nn.Linear(hidden_size, hidden_size), nn.ReLU()]) # output layer self.n_outputs = n_outputs module_list.append( nn.Linear(hidden_size, output_size * n_outputs) ) # <<<<<<<< modified: replaced output_size with output_size * n_outputs self.sequential = nn.Sequential(*module_list) def forward(self, x: torch.Tensor) -> torch.Tensor: # x of shape: batch_size x n_timesteps_in # output of shape batch_size x n_timesteps_out return self.sequential(x).reshape(x.size(0), -1, self.n_outputs) # <<<<<<<< modified: added reshape # test that network works as intended network = FullyConnectedMultiOutputModule(input_size=5, output_size=2, hidden_size=10, n_hidden_layers=2, n_outputs=7) x = torch.rand(20, 5) network(x).shape # <<<<<<<<<< instead of shape (20, 2), returning additional dimension for quantiles
torch.Size([20, 2, 7])
Using the above-defined FullyConnectedMultiOutputModule, we could create a new model and use QuantileLoss. Note that you would have to align n_outputs with the number of quantiles in the QuantileLoss class either manually or by making use of the from_dataset() method. If you want to switch back to a loss on a single output such as for MAE, simply set the n_ouputs=1 as all PyTorch Forecasting metrics can handle the additional third dimension as long as it is of size 1.
FullyConnectedMultiOutputModule
QuantileLoss
MAE
n_ouputs=1
To implement a new metric, you simply need to inherit from the MultiHorizonMetric and define the loss function. The MultiHorizonMetric handles everything from weighting to masking values for you. E.g. the mean absolute error is implemented as
MultiHorizonMetric
[33]:
from pytorch_forecasting.metrics import MultiHorizonMetric class MAE(MultiHorizonMetric): def loss(self, y_pred, target): loss = (self.to_prediction(y_pred) - target).abs() return loss
You might notice the to_prediction() method. Generally speaking, it convertes y_pred to a point-prediction. By default, this means that it removes the third dimension from y_pred if there is one. For most metrics, this is exactly what you need.
to_prediction()
y_pred
Sometimes a networks’s forward() output does not trivially map to a prediction and your to_prediction needs to be implemented separately. For example, this is the case if you predict the parameters of a distribution as is the case for all classes deriving from DistributionLoss. In particular, this means that you need to handle training and prediction differently.
forward()
to_prediction
DistributionLoss
We will study now the case of the NormalDistributionLoss. It requires us to predict the mean and the scale of the normal distribution. We can do so by leveraging our FullyConnectedMultiOutputModule class that we used for predicting multiple quantiles.
NormalDistributionLoss
mean
scale
[34]:
from copy import copy from pytorch_forecasting.metrics import NormalDistributionLoss class FullyConnectedForDistributionLossModel(BaseModel): # we inherit the `from_dataset` method def __init__(self, input_size: int, output_size: int, hidden_size: int, n_hidden_layers: int, **kwargs): # saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this self.save_hyperparameters() # pass additional arguments to BaseModel.__init__, mandatory call - do not skip this super().__init__(**kwargs) self.network = FullyConnectedMultiOutputModule( input_size=self.hparams.input_size, output_size=self.hparams.output_size, hidden_size=self.hparams.hidden_size, n_hidden_layers=self.hparams.n_hidden_layers, n_outputs=2, # <<<<<<<< we predict two outputs for mean and scale of the normal distribution ) self.loss = NormalDistributionLoss() @classmethod def from_dataset(cls, dataset: TimeSeriesDataSet, **kwargs): new_kwargs = { "output_size": dataset.max_prediction_length, "input_size": dataset.max_encoder_length, } new_kwargs.update(kwargs) # use to pass real hyperparameters and override defaults set by dataset # example for dataset validation assert dataset.max_prediction_length == dataset.min_prediction_length, "Decoder only supports a fixed length" assert dataset.min_encoder_length == dataset.max_encoder_length, "Encoder only supports a fixed length" assert ( len(dataset.time_varying_known_categoricals) == 0 and len(dataset.time_varying_known_reals) == 0 and len(dataset.time_varying_unknown_categoricals) == 0 and len(dataset.static_categoricals) == 0 and len(dataset.static_reals) == 0 and len(dataset.time_varying_unknown_reals) == 1 and dataset.time_varying_unknown_reals[0] == dataset.target ), "Only covariate should be the target in 'time_varying_unknown_reals'" return super().from_dataset(dataset, **new_kwargs) def forward(self, x: Dict[str, torch.Tensor], n_samples: int = None) -> Dict[str, torch.Tensor]: # x is a batch generated based on the TimeSeriesDataset network_input = x["encoder_cont"].squeeze(-1) prediction = self.network(network_input) # shape batch_size x n_decoder_steps x 2 if ( self.training or n_samples is None ): # training is a PyTorch variable indicating if a module is being trained (tracing gradients) or evaluated assert n_samples is None, "We need to predict parameters when training" output_transformation = True else: # let's sample from our distribution - first we need to scale the parameters to real space scaled_parameters = self.transform_output( dict( prediction=prediction, target_scale=x["target_scale"], ) ) # and then sample from distribution prediction = self.loss.sample(scaled_parameters, n_samples) output_transformation = None # predictions are already re-scaled return dict(prediction=prediction, target_scale=x["target_scale"], output_transformation=output_transformation) def transform_output(self, out: Dict[str, torch.Tensor]) -> torch.Tensor: # this is already implemented in pytorch forecasting but this code demonstrates the point # input is forward's output # depending on output, transform differently if out.get("output_transformation") is None: # samples are already rescaled out = out["prediction"] else: # parameters need to be rescaled out = self.loss.rescale_parameters( out["prediction"], target_scale=out["target_scale"], encoder=self.output_transformer ) return out def log_prediction(self, x: Dict[str, torch.Tensor], out: Dict[str, torch.Tensor], batch_idx: int) -> None: if ( out.get("output_transformation") is not None and (batch_idx % self.log_interval == 0 or self.log_interval < 1.0) and self.log_interval > 0 ): out = copy(out) # copy to avoid side-effects but do not deep copy to re-use references # sample from distribution to create valid prediction y_hat_detached = out["prediction"].detach() y_hat_samples = self.loss.sample(y_hat_detached, 100) out["prediction"] = y_hat_samples out["output_transformation"] = None super().log_prediction(x, out, batch_idx=batch_idx) def log_metrics( self, x: Dict[str, torch.Tensor], y: torch.Tensor, out: Dict[str, torch.Tensor], ) -> None: # Metrics (in contrast to the training loss: distribution loss) are calculated based on point predictions. # Therefore, we need to convert parameter outputs to if out.get("output_transformation") is not None: # use distribution properties to create point prediction out = copy(out) # copy to avoid side-effects but do not deep copy to re-use references y_hat_detached = out["prediction"].detach() y_hat_point_detached = self.loss.map_x_to_distribution(y_hat_detached).mean.unsqueeze(-1) out["prediction"] = y_hat_point_detached out["output_transformation"] = None super().log_metrics(x, y, out) model = FullyConnectedForDistributionLossModel.from_dataset(dataset, hidden_size=10, n_hidden_layers=2) model.summarize("full") model.hparams
| Name | Type | Params -------------------------------------------------------------------------- 0 | loss | NormalDistributionLoss | 0 1 | logging_metrics | ModuleList | 0 2 | network | FullyConnectedMultiOutputModule | 324 3 | network.sequential | Sequential | 324 4 | network.sequential.0 | Linear | 60 5 | network.sequential.1 | ReLU | 0 6 | network.sequential.2 | Linear | 110 7 | network.sequential.3 | ReLU | 0 8 | network.sequential.4 | Linear | 110 9 | network.sequential.5 | ReLU | 0 10 | network.sequential.6 | Linear | 44 -------------------------------------------------------------------------- 324 Trainable params 0 Non-trainable params 324 Total params
You notice that we override the transform_output() method. This method is responsible for rescaling the output of a network into real space. This is often trivial as the normalization of the target variable simply has to be inverted (and this is also the default). In case of distribution loss, a custom version is required.
Further, the log_metrics() method had to be overridden because the output of the network cannot be readily used for evaluating metrics such as mean absolute error - we first need to convert them into a point prediction - here not by sampling but by analytically calculating the mean of the distribution. The same is necessary for plotting the prediction. Therefore, we need to override log_prediction(). This is it - the network is fully ready for training.
log_metrics()
log_prediction()
We can now test that the network works as expected:
[35]:
x, y = next(iter(dataloader)) print("parameter predition shape: ", model(x)["prediction"].size()) model.eval() # set model into eval mode for sampling print("sample prediction shape: ", model(x, n_samples=200)["prediction"].size())
parameter predition shape: torch.Size([4, 2, 2]) sample prediction shape: torch.Size([4, 2, 200])
To run inference, you can still use the predict() method, as additional arguments are passed to the network’s forward() method, i.e. we can execute the following line to generate 100 traces and subsequently calculate quantiles:
predict()
[36]:
model.predict(dataloader, n_samples=100, mode="quantiles").shape
torch.Size([12, 2, 7])
The returned quantiles are here determined by the quantiles defined in the loss function and can be modified by passing a list of quantiles to at initialization.
[37]:
model.loss.quantiles
[0.02, 0.1, 0.25, 0.5, 0.75, 0.9, 0.98]
[38]:
NormalDistributionLoss(quantiles=[0.2, 0.8]).quantiles
[0.2, 0.8]
PyTorch Forecasting supports plotting of predictions and interpretations. The figures can also be logged as part of monitoring training progress using tensorboard. Sometimes, the output of the network cannot be directly plotted together with the actually observed time series. In these cases (such as our FullyConnectedForDistributionLossModel from the previous section), we need to fix the plotting function. Further, sometimes we want to visualize certain properties of the network every other batch or after every epoch. It is easy to make this happen with PyTorch Forecasting and the LightningModule on which the BaseModel is based.
FullyConnectedForDistributionLossModel
The log_interval() property provides a log_interval that switches automatically between the hyperparameters log_interval or log_val_interval depending if the model is in training or validation mode. If it is larger than 0, logging is enabled and if batch_idx % log_interval == 0 for a batch, logging for that batch is triggered. You can even set it to a number smaller than 1 leading to multiple logging events during a single batch.
log_interval()
log_interval
log_val_interval
batch_idx % log_interval == 0
One of the easiest ways to log a figure regularly, is overriding the plot_prediction() method, e.g. to add something to the generated plot.
plot_prediction()
In the following example, we will add an additional line indicating attention to the figure logged:
[39]:
import matplotlib.pyplot as plt def plot_prediction( self, x: Dict[str, torch.Tensor], out: Dict[str, torch.Tensor], idx: int, plot_attention: bool = True, add_loss_to_title: bool = False, show_future_observed: bool = True, ax=None, ) -> plt.Figure: """ Plot actuals vs prediction and attention Args: x (Dict[str, torch.Tensor]): network input out (Dict[str, torch.Tensor]): network output idx (int): sample index plot_attention: if to plot attention on secondary axis add_loss_to_title: if to add loss to title. Default to False. show_future_observed: if to show actuals for future. Defaults to True. ax: matplotlib axes to plot on Returns: plt.Figure: matplotlib figure """ # plot prediction as normal fig = super().plot_prediction( x, out, idx=idx, add_loss_to_title=add_loss_to_title, show_future_observed=show_future_observed, ax=ax ) # add attention on secondary axis if plot_attention: interpretation = self.interpret_output(out) ax = fig.axes[0] ax2 = ax.twinx() ax2.set_ylabel("Attention") encoder_length = x["encoder_lengths"][idx] ax2.plot( torch.arange(-encoder_length, 0), interpretation["attention"][idx, :encoder_length].detach().cpu(), alpha=0.2, color="k", ) fig.tight_layout() return fig
If you want to add a completely new figure, override the log_prediction() method.
Logging at the end of an epoch is another common use case. You might want to calculate additional results in each step and then summarize them at the end of an epoch. Here, you can override the step() method to calculate additional results to summarize and the epoch_end() hook provided by PyTorch Lightning.
step()
epoch_end()
In the example below, we first calculate some interpretation result (but only if logging is enabled) and add it to the log object for later summarization. In the epoch_end() hook we take the list of saved results, and use the log_interpretation() method (that is defined in the model elsewhere) to log a figure to the tensorboard.
log
log_interpretation()
[40]:
def step( self, x: Dict[str, torch.Tensor], y: torch.Tensor, batch_idx: int, **kwargs ) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]: """ Run for each train/val step. Args: x (Dict[str, torch.Tensor]): x as passed to the network by the dataloader y (torch.Tensor): y as passed to the loss function by the dataloader batch_idx (int): batch number **kwargs: additional arguments to pass to the network apart from ``x`` Returns: Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]: tuple where the first entry is a dictionary to which additional logging results can be added for consumption in the ``epoch_end`` hook and the second entry is the model's output. """ # extract data and run model log, out = super().step(x, y, batch_idx) # calculate interpretations etc for latter logging if self.log_interval > 0: detached_output = {name: tensor.detach() for name, tensor in out.items()} interpretation = self.interpret_output( detached_output, reduction="sum", attention_prediction_horizon=0, # attention only for first prediction horizon ) log["interpretation"] = interpretation return log, out def epoch_end(self, outputs): """ Run at epoch end for training or validation """ if self.log_interval > 0: self.log_interpretation(outputs)
A common use case is to log the final embeddings at the end of training. You can easily achieve this by levering the PyTorch Lightning on_fit_end() model hook. Override that method to log the embeddings.
on_fit_end()
The follow example assumes that there is a input_embeddings is a dictionary like object of embeddings that are being trained such as the MultiEmbedding class. Further a hyperparameter embedding_labels exists (as automatically required and created by the BaseModelWithCovariates.
input_embeddings
embedding_labels
[41]:
def on_fit_end(self): """ run at the end of training """ if self.log_interval > 0: for name, emb in self.input_embeddings.items(): labels = self.hparams.embedding_labels[name] self.logger.experiment.add_embedding( emb.weight.data.cpu(), metadata=labels, tag=name, global_step=self.global_step )
Testing models is essential to quickly detect problems and iterate quickly. Some issues can be only identified after lengthy training but many problems show up after one or two batches. PyTorch Lightning, on which PyTorch Forecasting is built, makes it easy to set up such tests.
Every model should be trainable with some minimal dataset. Here is how:
Define a dataset that works with the model. If it takes long to create, you can save it to disk with the save() method and load it with the load() method when you want to run tests. In any case, create a reasonably small dataset.
save()
load()
Initialize your model with log_interval=1 to test logging of plots - in particular the plot_prediction() method.
log_interval=1
Define a Pytorch Lightning Trainer and initialize it with fast_dev_run=True. This ensures that not full epochs but just a couple of batches are passed through the training and validation steps.
fast_dev_run=True
Train your model and check that it executes.
As example, we marshall the FullyConnectedForDistributionLossModel defined earlier in this tutorial:
[42]:
from pytorch_lightning import Trainer model = FullyConnectedForDistributionLossModel.from_dataset(dataset, hidden_size=10, n_hidden_layers=2, log_interval=1) trainer = Trainer(fast_dev_run=True) trainer.fit(model, train_dataloader=dataloader, val_dataloaders=dataloader)
GPU available: False, used: False TPU available: None, using: 0 TPU cores Running in fast_dev_run mode: will run a full train, val and test loop using 1 batch(es). | Name | Type | Params -------------------------------------------------------------------- 0 | loss | NormalDistributionLoss | 0 1 | logging_metrics | ModuleList | 0 2 | network | FullyConnectedMultiOutputModule | 324 -------------------------------------------------------------------- 324 Trainable params 0 Non-trainable params 324 Total params
Epoch 0: 50%|█████ | 1/2 [00:00<00:00, 19.86it/s, loss=-0.217] Epoch 0: 100%|██████████| 2/2 [00:00<00:00, 20.29it/s, loss=-0.217, train_loss_step=-.217, val_loss=0.517] Epoch 0: 100%|██████████| 2/2 [00:00<00:00, 19.08it/s, loss=-0.217, train_loss_step=-.217, val_loss=0.517]
1