Source code for pytorch_forecasting.models.nbeats.sub_modules

"""
Implementation of ``nn.Modules`` for N-Beats model.
"""

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F


[docs] def linear(input_size, output_size, bias=True, dropout: int = None): lin = nn.Linear(input_size, output_size, bias=bias) if dropout is not None: return nn.Sequential(nn.Dropout(dropout), lin) else: return lin
[docs] def linspace( backcast_length: int, forecast_length: int, centered: bool = False ) -> tuple[np.ndarray, np.ndarray]: if centered: norm = max(backcast_length, forecast_length) start = -backcast_length stop = forecast_length - 1 else: norm = backcast_length + forecast_length start = 0 stop = backcast_length + forecast_length - 1 lin_space = np.linspace( start / norm, stop / norm, backcast_length + forecast_length, dtype=np.float32 ) b_ls = lin_space[:backcast_length] f_ls = lin_space[backcast_length:] return b_ls, f_ls
[docs] class NBEATSBlock(nn.Module): def __init__( self, units, thetas_dim, num_block_layers=4, backcast_length=10, forecast_length=5, share_thetas=False, dropout=0.1, ): super().__init__() self.units = units self.thetas_dim = thetas_dim self.backcast_length = backcast_length self.forecast_length = forecast_length self.share_thetas = share_thetas fc_stack = [ nn.Linear(backcast_length, units), nn.ReLU(), ] for _ in range(num_block_layers - 1): fc_stack.extend([linear(units, units, dropout=dropout), nn.ReLU()]) self.fc = nn.Sequential(*fc_stack) if share_thetas: self.theta_f_fc = self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False) else: self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False) self.theta_f_fc = nn.Linear(units, thetas_dim, bias=False)
[docs] def forward(self, x): return self.fc(x)
[docs] class NBEATSSeasonalBlock(NBEATSBlock): def __init__( self, units, thetas_dim=None, num_block_layers=4, backcast_length=10, forecast_length=5, nb_harmonics=None, min_period=1, dropout=0.1, ): if nb_harmonics: thetas_dim = nb_harmonics else: thetas_dim = forecast_length self.min_period = min_period super().__init__( units=units, thetas_dim=thetas_dim, num_block_layers=num_block_layers, backcast_length=backcast_length, forecast_length=forecast_length, share_thetas=True, dropout=dropout, ) backcast_linspace, forecast_linspace = linspace( backcast_length, forecast_length, centered=False ) p1, p2 = ( (thetas_dim // 2, thetas_dim // 2) if thetas_dim % 2 == 0 else (thetas_dim // 2, thetas_dim // 2 + 1) ) s1_b = torch.tensor( np.cos(2 * np.pi * self.get_frequencies(p1)[:, None] * backcast_linspace), dtype=torch.float32, ) # H/2-1 s2_b = torch.tensor( np.sin(2 * np.pi * self.get_frequencies(p2)[:, None] * backcast_linspace), dtype=torch.float32, ) self.register_buffer("S_backcast", torch.cat([s1_b, s2_b])) s1_f = torch.tensor( np.cos(2 * np.pi * self.get_frequencies(p1)[:, None] * forecast_linspace), dtype=torch.float32, ) # H/2-1 s2_f = torch.tensor( np.sin(2 * np.pi * self.get_frequencies(p2)[:, None] * forecast_linspace), dtype=torch.float32, ) self.register_buffer("S_forecast", torch.cat([s1_f, s2_f]))
[docs] def forward(self, x) -> tuple[torch.Tensor, torch.Tensor]: x = super().forward(x) amplitudes_backward = self.theta_b_fc(x) backcast = amplitudes_backward.mm(self.S_backcast) amplitudes_forward = self.theta_f_fc(x) forecast = amplitudes_forward.mm(self.S_forecast) return backcast, forecast
def get_frequencies(self, n): return np.linspace( 0, (self.backcast_length + self.forecast_length) / self.min_period, n )
[docs] class NBEATSTrendBlock(NBEATSBlock): def __init__( self, units, thetas_dim, num_block_layers=4, backcast_length=10, forecast_length=5, dropout=0.1, ): super().__init__( units=units, thetas_dim=thetas_dim, num_block_layers=num_block_layers, backcast_length=backcast_length, forecast_length=forecast_length, share_thetas=True, dropout=dropout, ) backcast_linspace, forecast_linspace = linspace( backcast_length, forecast_length, centered=True ) norm = np.sqrt( forecast_length / thetas_dim ) # ensure range of predictions is comparable to input thetas_dims_range = np.array(range(thetas_dim)) coefficients = torch.tensor( backcast_linspace ** thetas_dims_range[:, None], dtype=torch.float32, ) self.register_buffer("T_backcast", coefficients * norm) coefficients = torch.tensor( forecast_linspace ** thetas_dims_range[:, None], dtype=torch.float32, ) self.register_buffer("T_forecast", coefficients * norm)
[docs] def forward(self, x) -> tuple[torch.Tensor, torch.Tensor]: x = super().forward(x) backcast = self.theta_b_fc(x).mm(self.T_backcast) forecast = self.theta_f_fc(x).mm(self.T_forecast) return backcast, forecast
[docs] class NBEATSGenericBlock(NBEATSBlock): def __init__( self, units, thetas_dim, num_block_layers=4, backcast_length=10, forecast_length=5, dropout=0.1, ): super().__init__( units=units, thetas_dim=thetas_dim, num_block_layers=num_block_layers, backcast_length=backcast_length, forecast_length=forecast_length, dropout=dropout, ) self.backcast_fc = nn.Linear(thetas_dim, backcast_length) self.forecast_fc = nn.Linear(thetas_dim, forecast_length)
[docs] def forward(self, x): x = super().forward(x) theta_b = F.relu(self.theta_b_fc(x)) theta_f = F.relu(self.theta_f_fc(x)) return self.backcast_fc(theta_b), self.forecast_fc(theta_f)