"""
dklgpr.py
=========
Deep kernel learning (DKL)-based gaussian process regression (GPR)
Created by Maxim Ziatdinov (email: maxim.ziatdinov@ai4microscopy.com)
"""
import warnings
from typing import Tuple, Type, Union, List
import gpytorch
import numpy as np
import torch
from ...trainers import dklGPTrainer
from ...utils import init_dataloader
mvn_ = gpytorch.distributions.MultivariateNormal
[docs]class dklGPR(dklGPTrainer):
"""
Deep kernel learning (DKL)-based Gaussian process regression (GPR)
Args:
indim: input feature dimension
embedim: embedding dimension (determines dimensionality of kernel space)
shared_embedding_space: use one embedding space for all target outputs
Keyword Args:
device:
Sets device to which model and data will be moved.
Defaults to 'cuda:0' if a GPU is available and to CPU otherwise.
precision:
Sets tensor types for 'single' (torch.float32)
or 'double' (torch.float64) precision
seed:
Seed for enforcing reproducibility
Examples:
Train a DKL-GPR model with high-dimensional inputs X and outputs y:
>>> data_dim = X.shape[-1] # X dimensions are n_samples x d
>>> dklgp = aoi.models.dklGPR(data_dim, embedim=2, precision="double")
>>> dklgp.fit(X, y, training_cycles=100, lr=1e-2)
Make a prediction on new data (mean and variance for each 'test' point):
>>> mean, var = dklgp.predict(X_test, batch_size=len(X_test))
Alternatively, one can obtain a prediction as follows:
>>> samples = dklgp.sample_from_posterior(X_test, num_samples=1000)
>>> mean, var = samples.mean(0), samples.var(0)
"""
def __init__(self,
indim: int,
embedim: int = 2,
shared_embedding_space: bool = True,
**kwargs: Union[str, int]) -> None:
"""
Initializes DKL-GPR model
"""
args = (indim, embedim, shared_embedding_space)
super(dklGPR, self).__init__(*args, **kwargs)
[docs] def fit(self, X: Union[torch.Tensor, np.ndarray],
y: Union[torch.Tensor, np.ndarray],
training_cycles: int = 1,
**kwargs: Union[Type[torch.nn.Module], bool, float]
) -> None:
"""
Initializes and trains a deep kernel GP model
Args:
X: Input training data (aka features) of N x input_dim dimensions
y: Output targets of batch_size x N or N (if batch_size=1) dimensions
training_cycles: Number of training epochs
Keyword Args:
feature_extractor:
(Optional) Custom neural network for feature extractor.
Must take input/feature dims and embedding dims as its arguments.
freeze_weights:
Freezes weights of feature extractor, that is, they are not
passed to the optimizer. Used for a transfer learning.
lr: learning rate (Default: 0.01)
print_loss: print loss at every n-th training cycle (epoch)
"""
_ = self.run(X, y, training_cycles, **kwargs)
[docs] def fit_ensemble(self, X: Union[torch.Tensor, np.ndarray],
y: Union[torch.Tensor, np.ndarray],
training_cycles: int = 1,
n_models: int = 5,
**kwargs: Union[Type[torch.nn.Module], bool, float]
) -> None:
"""
Initializes and trains an ensemble of deep kernel GP model
Args:
X: Input training data (aka features) of N x input_dim dimensions
y: Output targets of batch_size x N or N (if batch_size=1) dimensions
training_cycles: Number of training epochs
n_models: Number of models in ensemble
Keyword Args:
feature_extractor:
(Optional) Custom neural network for feature extractor.
Must take input/feature dims and embedding dims as its arguments.
freeze_weights:
Freezes weights of feature extractor, that is, they are not
passed to the optimizer. Used for a transfer learning.
lr: learning rate (Default: 0.01)
print_loss: print loss at every n-th training cycle (epoch)
"""
if y.ndim == 1:
y = y[None]
if y.shape[0] > 1:
raise NotImplementedError(
"The ensemble training is currently supported only for scalar targets")
y = y.repeat(n_models, 0) if isinstance(y, np.ndarray) else y.repeat(n_models, 1)
if self.correlated_output:
msg = ("Replacing a single shared embedding space with" +
" {} independent ones").format(n_models)
warnings.warn(msg)
self.correlated_output = False
self.ensemble = True
_ = self.run(X, y, training_cycles, **kwargs)
def _compute_posterior(self, X: torch.Tensor) -> Union[mvn_, List[mvn_]]:
"""
Computes the posterior over model outputs at the provided points (X).
For a model with multiple independent outputs, it returns a list of
posteriors computed for each independent model.
"""
if not self.correlated_output:
if X.ndim != 3 or X.shape[0] != len(self.gp_model.train_targets):
raise ValueError(
"The input data must have q x n x d dimensionality " +
"where the first dimension (q) must be equal to the " +
"number of independent outputs")
self.gp_model.eval()
self.likelihood.eval()
wrn = gpytorch.models.exact_gp.GPInputWarning
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=wrn)
with torch.no_grad(), gpytorch.settings.use_toeplitz(False), gpytorch.settings.fast_pred_var():
if self.correlated_output:
posterior = self.gp_model(X.to(self.device))
else:
posterior = self.gp_model(*X.to(self.device).split(1))
return posterior
[docs] def sample_from_posterior(self, X, num_samples: int = 1000) -> np.ndarray:
"""
Computes the posterior over model outputs at the provided points (X)
and samples from it
"""
X, _ = self.set_data(X)
gp_batch_dim = len(self.gp_model.train_targets)
X = X.expand(gp_batch_dim, *X.shape)
posterior = self._compute_posterior(X)
if self.correlated_output:
samples = posterior.rsample(torch.Size([num_samples, ]))
else:
samples = [p.rsample(torch.Size([num_samples, ])) for p in posterior]
samples = torch.cat(samples, 1)
return samples.cpu().numpy()
[docs] def thompson(self,
X_cand: Union[torch.Tensor, np.ndarray],
scalarize_func = None,
maximize: bool = True) -> Tuple[np.ndarray, int]:
"""
Thompson sampling for selecting the next measurement point
"""
X_cand, _ = self.set_data(X_cand)
gp_batch_dim = len(self.gp_model.train_targets)
X_cand = X_cand.expand(gp_batch_dim, *X_cand.shape).squeeze()
posterior = self._compute_posterior(X_cand)
if self.correlated_output:
tsample = posterior.rsample()
else:
tsample = torch.cat([p.rsample() for p in posterior])
if tsample.ndim > 1 and scalarize_func is not None:
tsample = scalarize_func(tsample).unsqueeze(0)
idx = tsample.argmax(1) if maximize else tsample.argmin(1)
return tsample.cpu().numpy(), idx.cpu().numpy()
def _predict(self, x_new: torch.Tensor) -> Tuple[torch.Tensor]:
posterior = self._compute_posterior(x_new)
if self.correlated_output:
return posterior.mean.cpu(), posterior.variance.cpu()
means_all = torch.cat([p.mean for p in posterior])
vars_all = torch.cat([p.variance for p in posterior])
return means_all.cpu(), vars_all.cpu()
[docs] def predict(self, x_new: Union[torch.Tensor, np.ndarray],
**kwargs) -> Tuple[np.ndarray]:
"""
Prediction of mean and variance using the trained model
"""
gp_batch_dim = len(self.gp_model.train_targets)
x_new, _ = self.set_data(x_new, device='cpu')
data_loader = init_dataloader(x_new, shuffle=False, **kwargs)
predicted_mean, predicted_var = [], []
for (x,) in data_loader:
x = x.expand(gp_batch_dim, *x.shape)
mean, var = self._predict(x)
predicted_mean.append(mean)
predicted_var.append(var)
return (torch.cat(predicted_mean, 1).numpy().squeeze(),
torch.cat(predicted_var, 1).numpy().squeeze())
def _embed(self, x_new: torch.Tensor):
self.gp_model.eval()
with torch.no_grad():
if self.correlated_output:
embeded = self.gp_model.feature_extractor(x_new)
embeded = self.gp_model.scale_to_bounds(embeded)
else:
embeded = [m.scale_to_bounds(m.feature_extractor(x_new))[..., None]
for m in self.gp_model.models]
embeded = torch.cat(embeded, -1)
return embeded.cpu()
[docs] def embed(self, x_new: Union[torch.Tensor, np.ndarray],
**kwargs: int) -> torch.Tensor:
"""
Embeds the input data to a "latent" space using a trained feature extractor NN.
"""
x_new, _ = self.set_data(x_new, device='cpu')
data_loader = init_dataloader(x_new, shuffle=False, **kwargs)
embeded = torch.cat([self._embed(x.to(self.device)) for (x,) in data_loader], 0)
if not self.correlated_output and not self.ensemble:
embeded = embeded.permute(-1, 0, 1)
return embeded.numpy()