Source code for pytorch_forecasting.models.nbeats.sub_modules

Implementation of ``nn.Modules`` for N-Beats model.

from typing import Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

[docs] def linear(input_size, output_size, bias=True, dropout: int = None): lin = nn.Linear(input_size, output_size, bias=bias) if dropout is not None: return nn.Sequential(nn.Dropout(dropout), lin) else: return lin
[docs] def linspace(backcast_length: int, forecast_length: int, centered: bool = False) -> Tuple[np.ndarray, np.ndarray]: if centered: norm = max(backcast_length, forecast_length) start = -backcast_length stop = forecast_length - 1 else: norm = backcast_length + forecast_length start = 0 stop = backcast_length + forecast_length - 1 lin_space = np.linspace(start / norm, stop / norm, backcast_length + forecast_length, dtype=np.float32) b_ls = lin_space[:backcast_length] f_ls = lin_space[backcast_length:] return b_ls, f_ls
[docs] class NBEATSBlock(nn.Module): def __init__( self, units, thetas_dim, num_block_layers=4, backcast_length=10, forecast_length=5, share_thetas=False, dropout=0.1, ): super().__init__() self.units = units self.thetas_dim = thetas_dim self.backcast_length = backcast_length self.forecast_length = forecast_length self.share_thetas = share_thetas fc_stack = [ nn.Linear(backcast_length, units), nn.ReLU(), ] for _ in range(num_block_layers - 1): fc_stack.extend([linear(units, units, dropout=dropout), nn.ReLU()]) self.fc = nn.Sequential(*fc_stack) if share_thetas: self.theta_f_fc = self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False) else: self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False) self.theta_f_fc = nn.Linear(units, thetas_dim, bias=False)
[docs] def forward(self, x): return self.fc(x)
[docs] class NBEATSSeasonalBlock(NBEATSBlock): def __init__( self, units, thetas_dim=None, num_block_layers=4, backcast_length=10, forecast_length=5, nb_harmonics=None, min_period=1, dropout=0.1, ): if nb_harmonics: thetas_dim = nb_harmonics else: thetas_dim = forecast_length self.min_period = min_period super().__init__( units=units, thetas_dim=thetas_dim, num_block_layers=num_block_layers, backcast_length=backcast_length, forecast_length=forecast_length, share_thetas=True, dropout=dropout, ) backcast_linspace, forecast_linspace = linspace(backcast_length, forecast_length, centered=False) p1, p2 = (thetas_dim // 2, thetas_dim // 2) if thetas_dim % 2 == 0 else (thetas_dim // 2, thetas_dim // 2 + 1) s1_b = torch.tensor( [np.cos(2 * np.pi * i * backcast_linspace) for i in self.get_frequencies(p1)], dtype=torch.float32 ) # H/2-1 s2_b = torch.tensor( [np.sin(2 * np.pi * i * backcast_linspace) for i in self.get_frequencies(p2)], dtype=torch.float32 ) self.register_buffer("S_backcast",[s1_b, s2_b])) s1_f = torch.tensor( [np.cos(2 * np.pi * i * forecast_linspace) for i in self.get_frequencies(p1)], dtype=torch.float32 ) # H/2-1 s2_f = torch.tensor( [np.sin(2 * np.pi * i * forecast_linspace) for i in self.get_frequencies(p2)], dtype=torch.float32 ) self.register_buffer("S_forecast",[s1_f, s2_f]))
[docs] def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]: x = super().forward(x) amplitudes_backward = self.theta_b_fc(x) backcast = amplitudes_forward = self.theta_f_fc(x) forecast = return backcast, forecast
def get_frequencies(self, n): return np.linspace(0, (self.backcast_length + self.forecast_length) / self.min_period, n)
[docs] class NBEATSTrendBlock(NBEATSBlock): def __init__( self, units, thetas_dim, num_block_layers=4, backcast_length=10, forecast_length=5, dropout=0.1, ): super().__init__( units=units, thetas_dim=thetas_dim, num_block_layers=num_block_layers, backcast_length=backcast_length, forecast_length=forecast_length, share_thetas=True, dropout=dropout, ) backcast_linspace, forecast_linspace = linspace(backcast_length, forecast_length, centered=True) norm = np.sqrt(forecast_length / thetas_dim) # ensure range of predictions is comparable to input coefficients = torch.tensor([backcast_linspace**i for i in range(thetas_dim)], dtype=torch.float32) self.register_buffer("T_backcast", coefficients * norm) coefficients = torch.tensor([forecast_linspace**i for i in range(thetas_dim)], dtype=torch.float32) self.register_buffer("T_forecast", coefficients * norm)
[docs] def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]: x = super().forward(x) backcast = self.theta_b_fc(x).mm(self.T_backcast) forecast = self.theta_f_fc(x).mm(self.T_forecast) return backcast, forecast
[docs] class NBEATSGenericBlock(NBEATSBlock): def __init__( self, units, thetas_dim, num_block_layers=4, backcast_length=10, forecast_length=5, dropout=0.1, ): super().__init__( units=units, thetas_dim=thetas_dim, num_block_layers=num_block_layers, backcast_length=backcast_length, forecast_length=forecast_length, dropout=dropout, ) self.backcast_fc = nn.Linear(thetas_dim, backcast_length) self.forecast_fc = nn.Linear(thetas_dim, forecast_length)
[docs] def forward(self, x): x = super().forward(x) theta_b = F.relu(self.theta_b_fc(x)) theta_f = F.relu(self.theta_f_fc(x)) return self.backcast_fc(theta_b), self.forecast_fc(theta_f)