"""
Encoders for encoding categorical variables and scaling continuous data.
"""
from typing import Any, Callable, Dict, Iterable, List, Tuple, Union, Optional
import warnings
import numpy as np
import pandas as pd
from copy import deepcopy
from sklearn.base import BaseEstimator, TransformerMixin
import torch
from torch.distributions import constraints
from torch.distributions.transforms import ExpTransform, PowerTransform, SigmoidTransform, Transform, _clipped_sigmoid
import torch.nn.functional as F
from torch.nn.utils import rnn
from pytorch_forecasting.utils import InitialParameterRepresenterMixIn
[docs]
def _plus_one(x):
return x + 1
[docs]
def _minus_one(x):
return x - 1
[docs]
def _identity(x):
return x
[docs]
def _clipped_logit(x):
finfo = torch.finfo(x.dtype)
x = x.clamp(min=finfo.eps, max=1.0 - finfo.eps)
return x.log() - (-x).log1p()
[docs]
def softplus_inv(y):
finfo = torch.finfo(y.dtype)
return y.where(y > 20.0, y + (y + finfo.eps).neg().expm1().neg().log())
[docs]
def _square(y):
return torch.pow(y, 2.0)
[docs]
def _clipped_log(y):
finfo = torch.finfo(y.dtype)
return y.log().clamp(min=-1 / finfo.eps)
[docs]
class NaNLabelEncoder(InitialParameterRepresenterMixIn, BaseEstimator, TransformerMixin, TransformMixIn):
"""
Labelencoder that can optionally always encode nan and unknown classes (in transform) as class ``0``
"""
def __init__(self, add_nan: bool = False, warn: bool = True):
"""
init NaNLabelEncoder
Args:
add_nan: if to force encoding of nan at 0
warn: if to warn if additional nans are added because items are unknown
"""
self.add_nan = add_nan
self.warn = warn
super().__init__()
[docs]
@staticmethod
def is_numeric(y: pd.Series) -> bool:
"""
Determine if series is numeric or not. Will also return True if series is a categorical type with
underlying integers.
Args:
y (pd.Series): series for which to carry out assessment
Returns:
bool: True if series is numeric
"""
return y.dtype.kind in "bcif" or (
isinstance(y.dtype, pd.CategoricalDtype) and y.cat.categories.dtype.kind in "bcif"
)
[docs]
def fit(self, y: pd.Series, overwrite: bool = False):
"""
Fit transformer
Args:
y (pd.Series): input data to fit on
overwrite (bool): if to overwrite current mappings or if to add to it.
Returns:
NaNLabelEncoder: self
"""
if not overwrite and hasattr(self, "classes_"):
offset = len(self.classes_)
else:
offset = 0
self.classes_ = {}
# determine new classes
if self.add_nan:
if self.is_numeric(y):
nan = np.nan
else:
nan = "nan"
self.classes_[nan] = 0
idx = 1
else:
idx = 0
idx += offset
for val in np.unique(y):
if val not in self.classes_:
self.classes_[val] = idx
idx += 1
self.classes_vector_ = np.array(list(self.classes_.keys()))
return self
def __call__(self, data: Dict[str, torch.Tensor]) -> torch.Tensor:
"""
Extract prediction from network output. Does not map back to input
categories as this would require a numpy tensor without grad-abilities.
Args:
data (Dict[str, torch.Tensor]): Dictionary with entries
* prediction: data to de-scale
Returns:
torch.Tensor: prediction
"""
return data["prediction"]
[docs]
def get_parameters(self, groups=None, group_names=None) -> np.ndarray:
"""
Get fitted scaling parameters for a given group.
All parameters are unused - exists for compatability.
Returns:
np.ndarray: zero array.
"""
return np.zeros(2, dtype=np.float64)
[docs]
class TorchNormalizer(InitialParameterRepresenterMixIn, BaseEstimator, TransformerMixin, TransformMixIn):
"""
Basic target transformer that can be fit also on torch tensors.
"""
def __init__(
self,
method: str = "standard",
center: bool = True,
transformation: Union[str, Tuple[Callable, Callable]] = None,
method_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Args:
method (str, optional): method to rescale series. Either "identity", "standard" (standard scaling)
or "robust" (scale using quantiles 0.25-0.75). Defaults to "standard".
method_kwargs (Dict[str, Any], optional): Dictionary of method specific arguments as listed below
* "robust" method: "upper", "lower", "center" quantiles defaulting to 0.75, 0.25 and 0.5
center (bool, optional): If to center the output to zero. Defaults to True.
transformation (Union[str, Dict[str, Callable]] optional): Transform values before
applying normalizer. Available options are
* None (default): No transformation of values
* log: Estimate in log-space leading to a multiplicative model
* log1p: Estimate in log-space but add 1 to values before transforming for stability
(e.g. if many small values <<1 are present).
Note, that inverse transform is still only `torch.exp()` and not `torch.expm1()`.
* logit: Apply logit transformation on values that are between 0 and 1
* count: Apply softplus to output (inverse transformation) and x + 1 to input (transformation)
* softplus: Apply softplus to output (inverse transformation) and inverse softplus to input
(transformation)
* relu: Apply max(0, x) to output
* Dict[str, Callable] of PyTorch functions that transforms and inversely transforms values.
``forward`` and ``reverse`` entries are required. ``inverse`` transformation is optional and
should be defined if ``reverse`` is not the inverse of the forward transformation. ``inverse_torch``
can be defined to provide a torch distribution transform for inverse transformations.
"""
self.method = method
assert method in ["standard", "robust", "identity"], f"method has invalid value {method}"
self.center = center
self.transformation = transformation
self.method_kwargs = method_kwargs
self._method_kwargs = deepcopy(method_kwargs) if method_kwargs is not None else {}
[docs]
def get_parameters(self, *args, **kwargs) -> torch.Tensor:
"""
Returns parameters that were used for encoding.
Returns:
torch.Tensor: First element is center of data and second is scale
"""
return torch.stack([torch.as_tensor(self.center_), torch.as_tensor(self.scale_)], dim=-1)
[docs]
def fit(self, y: Union[pd.Series, np.ndarray, torch.Tensor]):
"""
Fit transformer, i.e. determine center and scale of data
Args:
y (Union[pd.Series, np.ndarray, torch.Tensor]): input data
Returns:
TorchNormalizer: self
"""
y = self.preprocess(y)
self._set_parameters(y_center=y, y_scale=y)
return self
def _set_parameters(
self, y_center: Union[pd.Series, np.ndarray, torch.Tensor], y_scale: Union[pd.Series, np.ndarray, torch.Tensor]
):
"""
Calculate parameters for scale and center based on input timeseries
Args:
y_center (Union[pd.Series, np.ndarray, torch.Tensor]): timeseries for calculating center
y_scale (Union[pd.Series, np.ndarray, torch.Tensor]): timeseries for calculating scale
"""
if isinstance(y_center, torch.Tensor):
eps = torch.finfo(y_center.dtype).eps
else:
eps = np.finfo(np.float16).eps
if self.method == "identity":
if isinstance(y_center, torch.Tensor):
self.center_ = torch.zeros(y_center.size()[:-1])
self.scale_ = torch.ones(y_scale.size()[:-1])
elif isinstance(y_center, (np.ndarray, pd.Series, pd.DataFrame)):
# numpy default type is numpy.float64 while torch default type is torch.float32 (if not changed)
# therefore, we first generate torch tensors (with torch default type) and then
# convert them to numpy arrays
self.center_ = torch.zeros(y_center.shape[:-1]).numpy()
self.scale_ = torch.ones(y_scale.shape[:-1]).numpy()
else:
self.center_ = 0.0
self.scale_ = 1.0
elif self.method == "standard":
if isinstance(y_center, torch.Tensor):
self.center_ = torch.mean(y_center, dim=-1)
self.scale_ = torch.std(y_scale, dim=-1) + eps
elif isinstance(y_center, np.ndarray):
self.center_ = np.mean(y_center, axis=-1)
self.scale_ = np.std(y_scale, axis=-1) + eps
else:
self.center_ = np.mean(y_center)
self.scale_ = np.std(y_scale) + eps
# correct numpy scalar dtype promotion, e.g. fix type from `np.float32(0.0) + 1e-8` gives `np.float64(1e-8)`
if isinstance(self.scale_, np.ndarray):
self.scale_ = self.scale_.astype(y_scale.dtype)
elif self.method == "robust":
if isinstance(y_center, torch.Tensor):
self.center_ = y_center.quantile(self._method_kwargs.get("center", 0.5), dim=-1)
q_75 = y_scale.quantile(self._method_kwargs.get("upper", 0.75), dim=-1)
q_25 = y_scale.quantile(self._method_kwargs.get("lower", 0.25), dim=-1)
elif isinstance(y_center, np.ndarray):
self.center_ = np.percentile(y_center, self._method_kwargs.get("center", 0.5) * 100, axis=-1)
q_75 = np.percentile(y_scale, self._method_kwargs.get("upper", 0.75) * 100, axis=-1)
q_25 = np.percentile(y_scale, self._method_kwargs.get("lower", 0.25) * 100, axis=-1)
else:
self.center_ = np.percentile(y_center, self._method_kwargs.get("center", 0.5) * 100, axis=-1)
q_75 = np.percentile(y_scale, self._method_kwargs.get("upper", 0.75) * 100)
q_25 = np.percentile(y_scale, self._method_kwargs.get("lower", 0.25) * 100)
self.scale_ = (q_75 - q_25) / 2.0 + eps
if not self.center and self.method != "identity":
self.scale_ = self.center_
if isinstance(y_center, torch.Tensor):
self.center_ = torch.zeros_like(self.center_)
else:
self.center_ = np.zeros_like(self.center_)
if (np.asarray(self.scale_) < 1e-7).any():
warnings.warn(
"scale is below 1e-7 - consider not centering "
"the data or using data with higher variance for numerical stability",
UserWarning,
)
def __call__(self, data: Dict[str, torch.Tensor]) -> torch.Tensor:
"""
Inverse transformation but with network output as input.
Args:
data (Dict[str, torch.Tensor]): Dictionary with entries
* prediction: data to de-scale
* target_scale: center and scale of data
Returns:
torch.Tensor: de-scaled data
"""
# ensure output dtype matches input dtype
dtype = data["prediction"].dtype
# inverse transformation with tensors
norm = data["target_scale"]
# use correct shape for norm
if data["prediction"].ndim > norm.ndim:
norm = norm.unsqueeze(-1)
# transform
y = data["prediction"] * norm[:, 1, None] + norm[:, 0, None]
y = self.inverse_preprocess(y)
# return correct shape
if data["prediction"].ndim == 1 and y.ndim > 1:
y = y.squeeze(0)
return y.type(dtype)
[docs]
class EncoderNormalizer(TorchNormalizer):
"""
Special Normalizer that is fit on each encoding sequence.
If used, this transformer will be fitted on each encoder sequence separately.
This normalizer can be particularly useful as target normalizer.
"""
def __init__(
self,
method: str = "standard",
center: bool = True,
max_length: Union[int, List[int]] = None,
transformation: Union[str, Tuple[Callable, Callable]] = None,
method_kwargs: Dict[str, Any] = None,
):
"""
Initialize
Args:
method (str, optional): method to rescale series. Either "identity", "standard" (standard scaling)
or "robust" (scale using quantiles 0.25-0.75). Defaults to "standard".
method_kwargs (Dict[str, Any], optional): Dictionary of method specific arguments as listed below
* "robust" method: "upper", "lower", "center" quantiles defaulting to 0.75, 0.25 and 0.5
center (bool, optional): If to center the output to zero. Defaults to True.
max_length(Union[int, List[int]], optional): Maximum length to take into account for calculating
parameters. If tuple, first length is maximum length for calculating center and second is maximum
length for calculating scale. Defaults to entire length of time series.
transformation (Union[str, Tuple[Callable, Callable]] optional): Transform values before
applying normalizer. Available options are
* None (default): No transformation of values
* log: Estimate in log-space leading to a multiplicative model
* log1p: Estimate in log-space but add 1 to values before transforming for stability
(e.g. if many small values <<1 are present).
Note, that inverse transform is still only `torch.exp()` and not `torch.expm1()`.
* logit: Apply logit transformation on values that are between 0 and 1
* count: Apply softplus to output (inverse transformation) and x + 1 to input (transformation)
* softplus: Apply softplus to output (inverse transformation) and inverse softplus to input
(transformation)
* relu: Apply max(0, x) to output
* Dict[str, Callable] of PyTorch functions that transforms and inversely transforms values.
``forward`` and ``reverse`` entries are required. ``inverse`` transformation is optional and
should be defined if ``reverse`` is not the inverse of the forward transformation. ``inverse_torch``
can be defined to provide a torch distribution transform for inverse transformations.
"""
method_kwargs = deepcopy(method_kwargs) if method_kwargs is not None else {}
super().__init__(method=method, center=center, transformation=transformation, method_kwargs=method_kwargs)
self.max_length = max_length
[docs]
def fit(self, y: Union[pd.Series, np.ndarray, torch.Tensor]):
"""
Fit transformer, i.e. determine center and scale of data
Args:
y (Union[pd.Series, np.ndarray, torch.Tensor]): input data
Returns:
TorchNormalizer: self
"""
# reduce size of time series - take only max length
if self.max_length is None:
y_center = y_scale = self.preprocess(y)
elif isinstance(self.max_length, int):
y_center = y_scale = self.preprocess(self._slice(y, slice(-self.max_length, None)))
else:
y = self.preprocess(self._slice(y, slice(-max(self.max_length), None)))
if np.argmax(self.max_length) == 0:
y_center = y
y_scale = self._slice(y, slice(-self.max_length[1], None))
else:
y_center = self._slice(y, slice(-self.max_length[0], None))
y_scale = y
# set parameters for normalization
self._set_parameters(y_center=y_center, y_scale=y_scale)
return self
@property
def min_length(self):
if self.method == "identity":
return 0 # no timeseries properties used
elif self.center:
return 1 # only calculation of mean required
else:
return 2 # requires std, i.e. at least 2 entries
@staticmethod
def _slice(
x: Union[pd.DataFrame, pd.Series, np.ndarray, torch.Tensor], s: slice
) -> Union[pd.DataFrame, pd.Series, np.ndarray, torch.Tensor]:
"""
Slice pandas data frames, numpy arrays and tensors.
Args:
x (Union[pd.Series, np.ndarray, torch.Tensor]): object to slice
s (slice): slice, e.g. ``slice(None, -5)```
Returns:
Union[pd.Series, np.ndarray, torch.Tensor]: sliced object
"""
if isinstance(x, (pd.DataFrame, pd.Series)):
return x[s]
else:
return x[..., s]
[docs]
class GroupNormalizer(TorchNormalizer):
"""
Normalizer that scales by groups.
For each group a scaler is fitted and applied. This scaler can be used as target normalizer or
also to normalize any other variable.
"""
def __init__(
self,
method: str = "standard",
groups: Optional[List[str]] = None,
center: bool = True,
scale_by_group: bool = False,
transformation: Optional[Union[str, Tuple[Callable, Callable]]] = None,
method_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Group normalizer to normalize a given entry by groups. Can be used as target normalizer.
Args:
method (str, optional): method to rescale series. Either "standard" (standard scaling) or "robust"
(scale using quantiles 0.25-0.75). Defaults to "standard".
method_kwargs (Dict[str, Any], optional): Dictionary of method specific arguments as listed below
* "robust" method: "upper", "lower", "center" quantiles defaulting to 0.75, 0.25 and 0.5
groups (List[str], optional): Group names to normalize by. Defaults to [].
center (bool, optional): If to center the output to zero. Defaults to True.
scale_by_group (bool, optional): If to scale the output by group, i.e. norm is calculated as
``(group1_norm * group2_norm * ...) ^ (1 / n_groups)``. Defaults to False.
transformation (Union[str, Tuple[Callable, Callable]] optional): Transform values before
applying normalizer. Available options are
* None (default): No transformation of values
* log: Estimate in log-space leading to a multiplicative model
* log1p: Estimate in log-space but add 1 to values before transforming for stability
(e.g. if many small values <<1 are present).
Note, that inverse transform is still only `torch.exp()` and not `torch.expm1()`.
* logit: Apply logit transformation on values that are between 0 and 1
* count: Apply softplus to output (inverse transformation) and x + 1 to input
(transformation)
* softplus: Apply softplus to output (inverse transformation) and inverse softplus to input
(transformation)
* relu: Apply max(0, x) to output
* Dict[str, Callable] of PyTorch functions that transforms and inversely transforms values.
``forward`` and ``reverse`` entries are required. ``inverse`` transformation is optional and
should be defined if ``reverse`` is not the inverse of the forward transformation. ``inverse_torch``
can be defined to provide a torch distribution transform for inverse transformations.
"""
self.groups = groups
self._groups = list(groups) if groups is not None else []
self.scale_by_group = scale_by_group
method_kwargs = deepcopy(method_kwargs) if method_kwargs is not None else {}
super().__init__(method=method, center=center, transformation=transformation, method_kwargs=method_kwargs)
[docs]
def fit(self, y: pd.Series, X: pd.DataFrame):
"""
Determine scales for each group
Args:
y (pd.Series): input data
X (pd.DataFrame): dataframe with columns for each group defined in ``groups`` parameter.
Returns:
self
"""
y = self.preprocess(y)
eps = np.finfo(np.float16).eps
if len(self._groups) == 0:
assert not self.scale_by_group, "No groups are defined, i.e. `scale_by_group=[]`"
if self.method == "standard":
self.norm_ = {"center": np.mean(y), "scale": np.std(y) + eps} # center and scale
else:
quantiles = np.quantile(
y,
[
self._method_kwargs.get("lower", 0.25),
self._method_kwargs.get("center", 0.5),
self._method_kwargs.get("upper", 0.75),
],
)
self.norm_ = {
"center": quantiles[1],
"scale": (quantiles[2] - quantiles[0]) / 2.0 + eps,
} # center and scale
if not self.center:
self.norm_["scale"] = self.norm_["center"] + eps
self.norm_["center"] = 0.0
elif self.scale_by_group:
if self.method == "standard":
self.norm_ = {
g: X[[g]]
.assign(y=y)
.groupby(g, observed=True)
.agg(center=("y", "mean"), scale=("y", "std"))
.assign(center=lambda x: x["center"], scale=lambda x: x.scale + eps)
for g in self._groups
}
else:
self.norm_ = {
g: X[[g]]
.assign(y=y)
.groupby(g, observed=True)
.y.quantile(
[
self._method_kwargs.get("lower", 0.25),
self._method_kwargs.get("center", 0.5),
self._method_kwargs.get("upper", 0.75),
]
)
.unstack(-1)
.assign(
center=lambda x: x[self._method_kwargs.get("center", 0.5)],
scale=lambda x: (
x[self._method_kwargs.get("upper", 0.75)] - x[self._method_kwargs.get("lower", 0.25)]
)
/ 2.0
+ eps,
)[["center", "scale"]]
for g in self._groups
}
# calculate missings
if not self.center: # swap center and scale
def swap_parameters(norm):
norm["scale"] = norm["center"] + eps
norm["center"] = 0.0
return norm
self.norm_ = {g: swap_parameters(norm) for g, norm in self.norm_.items()}
self.missing_ = {group: scales.median().to_dict() for group, scales in self.norm_.items()}
else:
if self.method == "standard":
self.norm_ = (
X[self._groups]
.assign(y=y)
.groupby(self._groups, observed=True)
.agg(center=("y", "mean"), scale=("y", "std"))
.assign(center=lambda x: x["center"], scale=lambda x: x.scale + eps)
)
else:
self.norm_ = (
X[self._groups]
.assign(y=y)
.groupby(self._groups, observed=True)
.y.quantile(
[
self._method_kwargs.get("lower", 0.25),
self._method_kwargs.get("center", 0.5),
self._method_kwargs.get("upper", 0.75),
]
)
.unstack(-1)
.assign(
center=lambda x: x[self._method_kwargs.get("center", 0.5)],
scale=lambda x: (
x[self._method_kwargs.get("upper", 0.75)] - x[self._method_kwargs.get("lower", 0.25)]
)
/ 2.0
+ eps,
)[["center", "scale"]]
)
if not self.center: # swap center and scale
self.norm_["scale"] = self.norm_["center"] + eps
self.norm_["center"] = 0.0
self.missing_ = self.norm_.median().to_dict()
if (
(self.scale_by_group and any((self.norm_[group]["scale"] < 1e-7).any() for group in self._groups))
or (not self.scale_by_group and isinstance(self.norm_["scale"], float) and self.norm_["scale"] < 1e-7)
or (
not self.scale_by_group
and not isinstance(self.norm_["scale"], float)
and (self.norm_["scale"] < 1e-7).any()
)
):
warnings.warn(
"scale is below 1e-7 - consider not centering "
"the data or using data with higher variance for numerical stability",
UserWarning,
)
return self
@property
def names(self) -> List[str]:
"""
Names of determined scales.
Returns:
List[str]: list of names
"""
return ["center", "scale"]
[docs]
def get_parameters(self, groups: Union[torch.Tensor, list, tuple], group_names: List[str] = None) -> np.ndarray:
"""
Get fitted scaling parameters for a given group.
Args:
groups (Union[torch.Tensor, list, tuple]): group ids for which to get parameters
group_names (List[str], optional): Names of groups corresponding to positions
in ``groups``. Defaults to None, i.e. the instance attribute ``groups``.
Returns:
np.ndarray: parameters used for scaling
"""
if isinstance(groups, torch.Tensor):
groups = groups.tolist()
if isinstance(groups, list):
groups = tuple(groups)
if group_names is None:
group_names = self._groups
else:
# filter group names
group_names = [name for name in group_names if name in self._groups]
assert len(group_names) == len(self._groups), "Passed groups and fitted do not match"
if len(self._groups) == 0:
params = np.array([self.norm_["center"], self.norm_["scale"]])
elif self.scale_by_group:
norm = np.array([1.0, 1.0])
for group, group_name in zip(groups, group_names):
try:
norm = norm * self.norm_[group_name].loc[group].to_numpy()
except KeyError:
norm = norm * np.asarray([self.missing_[group_name][name] for name in self.names])
norm = np.power(norm, 1.0 / len(self._groups))
params = norm
else:
try:
params = self.norm_.loc[groups].to_numpy()
except (KeyError, TypeError):
params = np.asarray([self.missing_[name] for name in self.names])
return params
[docs]
def get_norm(self, X: pd.DataFrame) -> pd.DataFrame:
"""
Get scaling parameters for multiple groups.
Args:
X (pd.DataFrame): dataframe with ``groups`` columns
Returns:
pd.DataFrame: dataframe with scaling parameterswhere each row corresponds to the input dataframe
"""
if len(self._groups) == 0:
norm = np.asarray([self.norm_["center"], self.norm_["scale"]]).reshape(1, -1)
elif self.scale_by_group:
norm = [
np.prod(
[
X[group_name]
.map(self.norm_[group_name][name])
.fillna(self.missing_[group_name][name])
.to_numpy()
for group_name in self._groups
],
axis=0,
)
for name in self.names
]
norm = np.power(np.stack(norm, axis=1), 1.0 / len(self._groups))
else:
norm = X[self._groups].set_index(self._groups).join(self.norm_).fillna(self.missing_).to_numpy()
return norm
[docs]
class MultiNormalizer(TorchNormalizer):
"""
Normalizer for multiple targets.
This normalizers wraps multiple other normalizers.
"""
def __init__(self, normalizers: List[TorchNormalizer]):
"""
Args:
normalizers (List[TorchNormalizer]): list of normalizers to apply to targets
"""
self.normalizers = normalizers
[docs]
def fit(self, y: Union[pd.DataFrame, np.ndarray, torch.Tensor], X: pd.DataFrame = None):
"""
Fit transformer, i.e. determine center and scale of data
Args:
y (Union[pd.Series, np.ndarray, torch.Tensor]): input data
Returns:
MultiNormalizer: self
"""
if isinstance(y, pd.DataFrame):
y = y.to_numpy()
for idx, normalizer in enumerate(self.normalizers):
if isinstance(normalizer, GroupNormalizer):
normalizer.fit(y[:, idx], X)
else:
normalizer.fit(y[:, idx])
self.fitted_ = True
return self
def __getitem__(self, idx: int):
"""
Return normalizer.
Args:
idx (int): metric index
"""
return self.normalizers[idx]
def __iter__(self):
"""
Iter over normalizers.
"""
return iter(self.normalizers)
def __len__(self) -> int:
"""
Number of normalizers.
"""
return len(self.normalizers)
def __call__(self, data: Dict[str, Union[List[torch.Tensor], torch.Tensor]]) -> List[torch.Tensor]:
"""
Inverse transformation but with network output as input.
Args:
data (Dict[str, Union[List[torch.Tensor], torch.Tensor]]): Dictionary with entries
* prediction: list of data to de-scale
* target_scale: list of center and scale of data
Returns:
List[torch.Tensor]: list of de-scaled data
"""
denormalized = [
normalizer(dict(prediction=data["prediction"][idx], target_scale=data["target_scale"][idx]))
for idx, normalizer in enumerate(self.normalizers)
]
return denormalized
[docs]
def get_parameters(self, *args, **kwargs) -> List[torch.Tensor]:
"""
Returns parameters that were used for encoding.
Returns:
List[torch.Tensor]: First element is center of data and second is scale
"""
return [normalizer.get_parameters(*args, **kwargs) for normalizer in self.normalizers]
def __getattr__(self, name: str):
"""
Return dynamically attributes.
Return attributes if defined in this class. If not, create dynamically attributes based on
attributes of underlying normalizers that are lists. Create functions if necessary.
Arguments to functions are distributed to the functions if they are lists and their length
matches the number of normalizers. Otherwise, they are directly passed to each callable of the
normalizers.
Args:
name (str): name of attribute
Returns:
attributes of this class or list of attributes of underlying class
"""
try:
return super().__getattr__(name)
except AttributeError as e:
attribute_exists = all(hasattr(norm, name) for norm in self.normalizers)
if attribute_exists:
# check if to return callable or not and return function if yes
if callable(getattr(self.normalizers[0], name)):
n = len(self.normalizers)
def func(*args, **kwargs):
# if arg/kwarg is list and of length normalizers, then apply each part to a normalizer.
# otherwise pass it directly to all normalizers
results = []
for idx, norm in enumerate(self.normalizers):
new_args = [
(
arg[idx]
if isinstance(arg, (list, tuple))
and not isinstance(arg, rnn.PackedSequence)
and len(arg) == n
else arg
)
for arg in args
]
new_kwargs = {
key: (
val[idx]
if isinstance(val, list)
and not isinstance(val, rnn.PackedSequence)
and len(val) == n
else val
)
for key, val in kwargs.items()
}
results.append(getattr(norm, name)(*new_args, **new_kwargs))
return results
return func
else:
# else return list of attributes
return [getattr(norm, name) for norm in self.normalizers]
else: # attribute does not exist for all normalizers
raise e