Source code for supernnova.utils.training_utils

import torch.nn as nn
import torch
from supernnova.utils import logging_utils as lu
from supernnova.training import bayesian_rnn
from supernnova.training import bayesian_rnn_2
from supernnova.training import variational_rnn
from supernnova.training import vanilla_rnn
import os
import h5py
import json
import pickle
import numpy as np
from tqdm import tqdm
from pathlib import Path

from sklearn import metrics
import matplotlib.pyplot as plt

plt.switch_backend("agg")


[docs]def normalize_arr(arr, settings): """Normalize array before input to RNN - Log transform - Mean and std dev normalization Args: arr (np.array) array to normalize settings (ExperimentSettings): controls experiment hyperparameters Returns: (np.array) the normalized array """ if settings.norm == "none": return arr else: arr_min = settings.arr_norm[:, 0] arr_mean = settings.arr_norm[:, 1] arr_std = settings.arr_norm[:, 2] arr_to_norm = arr[:, settings.idx_features_to_normalize] # clipping arr_to_norm = np.clip(arr_to_norm, arr_min, np.inf) if "cosmo" not in settings.norm: # normalize using global norm arr_normed = np.log(arr_to_norm - arr_min + 1e-5) arr_normed = (arr_normed - arr_mean) / arr_std elif settings.norm == "cosmo": # normalize all lcs to 1 (fluxes), maintain color info # time is normalized as global norm arr_normed_cosmo = arr_to_norm # Flux + flux err normalization arr_normed_cosmo[:, :-1] = ( arr_normed_cosmo[:, :-1] / arr_normed_cosmo[:, :-1].max() ) # time normalization (log norm) tmp_cosmo = np.log(arr_to_norm[:, -1] - arr_min[-1] + 1e-5) arr_normed_cosmo[:, -1] = (tmp_cosmo - arr_mean[-1]) / arr_std[-1] # all together arr_normed = arr_normed_cosmo elif settings.norm == "cosmo_quantile": # normalize all lcs to 1 (using quantile not max for outliers) # maintain color info # time is normalized as global norm arr_normed_cosmo = arr_to_norm # Flux + flux err normalization quant_max = np.quantile(arr_normed_cosmo[:, :-1], 0.99) # clip quant_min and max arr_normed_cosmo[:, :-1] = arr_normed_cosmo[:, :-1] / quant_max # time normalization (log norm) tmp_cosmo = np.log(arr_to_norm[:, -1] - arr_min[-1] + 1e-5) arr_normed_cosmo[:, -1] = (tmp_cosmo - arr_mean[-1]) / arr_std[-1] # all together arr_normed = arr_normed_cosmo arr[:, settings.idx_features_to_normalize] = arr_normed return arr
[docs]def unnormalize_arr(arr, settings): """UnNormalize array Args: arr (np.array) array to normalize settings (ExperimentSettings): controls experiment hyperparameters Returns: (np.array) the normalized array """ if settings.norm == "none": return arr arr_min = settings.arr_norm[:, 0] arr_mean = settings.arr_norm[:, 1] arr_std = settings.arr_norm[:, 2] if "cosmo" in settings.norm: # onyl unnormalize time arr_to_unnorm = arr[:, settings.idx_features_to_normalize[-1]] arr_to_unnorm = arr_to_unnorm * arr_std[-1] + arr_mean[-1] arr_unnormed = np.exp(arr_to_unnorm) + arr_min[-1] - 1e-5 arr[:, settings.idx_features_to_normalize[-1]] = arr_unnormed else: arr_to_unnorm = arr[:, settings.idx_features_to_normalize] arr_to_unnorm = arr_to_unnorm * arr_std + arr_mean arr_unnormed = np.exp(arr_to_unnorm) + arr_min - 1e-5 arr[:, settings.idx_features_to_normalize] = arr_unnormed return arr
[docs]def fill_data_list( idxs, arr_data, arr_target, arr_SNID, settings, n_features, desc, test=False ): """Utility to create a list of data tuples used as inputs to RNN model The ``settings`` object specifies which feature are selected Args: idxs (np.array or list): idx of data point to select arr_data (np.array): features arr_target (np.array): target arr_SNID (np.array): lightcurve unique ID settings (ExperimentSettings): controls experiment hyperparameters n_features (int): total number of features in arr_data desc (str): message to display while loading test (bool): If True: add more data to the list, as it is required at test time. Default: ``False`` Returns: (list) the list of data tuples """ list_data = [] if desc == "": iterator = idxs else: iterator = tqdm(idxs, desc=desc, ncols=100) for i in iterator: X_all = arr_data[i].reshape(-1, n_features) target = int(arr_target[i]) lc = str(arr_SNID[i]) # Keep an unnormalized copy of the data (for test and display) X_ori = X_all.copy()[:, settings.idx_features] # check if normalization converges # using clipping in case of min<model_min X_clip = X_all.copy() X_clip = np.clip( X_clip[:, settings.idx_features_to_normalize], settings.arr_norm[:, 0], np.inf, ) X_all[:, settings.idx_features_to_normalize] = X_clip if "cosmo" not in settings.norm: X_tmp = unnormalize_arr(normalize_arr(X_all.copy(), settings), settings) assert np.all( np.all(np.isclose(np.ravel(X_all), np.ravel(X_tmp), atol=1e-1)) ) # Normalize features that need to be normalized X_normed = X_all.copy() X_normed_tmp = normalize_arr(X_normed, settings) # Select features as specified by the settings X_normed = X_normed_tmp[:, settings.idx_features] if test is True: list_data.append((X_normed, target, lc, X_all, X_ori)) else: list_data.append((X_normed, target, lc)) return list_data
[docs]def load_HDF5(settings, test=False): """Load data from HDF5 Args: settings (ExperimentSettings): controls experiment hyperparameters test (bool): If True: load data for test. Default: ``False`` Returns: list_data_test (list) test data tuples if test is True or Tuple containing - list_data_train (list): training data tuples - list_data_val (list): validation data tuples """ file_name = f"{settings.processed_dir}/database.h5" lu.print_green(f"Loading {file_name}") with h5py.File(file_name, "r") as hf: list_data_train = [] list_data_val = [] config_name = f"{settings.source_data}_{settings.nb_classes}classes" dataset_split_key = f"dataset_{config_name}" target_key = f"target_{settings.nb_classes}classes" if test: # ridiculous failsafe in case we have different classes in dataset/model # we will always have 2 classes try: idxs_test = np.where(hf[dataset_split_key][:] == 2)[0] except Exception: idxs_test = np.where(hf["dataset_photometry_2classes"][:] != 100)[0] else: idxs_train = np.where(hf[dataset_split_key][:] == 0)[0] idxs_val = np.where(hf[dataset_split_key][:] == 1)[0] idxs_test = np.where(hf[dataset_split_key][:] == 2)[0] # Shuffle for good measure np.random.shuffle(idxs_train) np.random.shuffle(idxs_val) np.random.shuffle(idxs_test) idxs_train = idxs_train[: int(settings.data_fraction * len(idxs_train))] n_features = hf["data"].attrs["n_features"] training_features_data = hf["features"][:].astype(str) training_features = settings.training_features # check if all model features are in db assert set(training_features) <= set(training_features_data) lu.print_green("Features used", " ".join(training_features)) try: settings.data_types_training = hf["data_types_training"][:].astype(str) except Exception: pass arr_data = hf["data"][:] if test: # ridiculous failsafe in case we have different classes in dataset/model # we will always have 2 classes try: arr_target = hf[target_key][:] except Exception: arr_target = hf["target_2classes"][:] else: arr_target = hf[target_key][:] arr_SNID = hf["SNID"][:].astype(str) if test is True: return fill_data_list( idxs_test, arr_data, arr_target, arr_SNID, settings, n_features, "Loading Test Set", test, ) else: list_data_train = fill_data_list( idxs_train, arr_data, arr_target, arr_SNID, settings, n_features, "Loading Training Set", ) list_data_val = fill_data_list( idxs_val, arr_data, arr_target, arr_SNID, settings, n_features, "Loading Validation Set", ) return list_data_train, list_data_val
[docs]def get_model(settings, input_size): """Create RNN model Args: settings (ExperimentSettings): controls experiment hyperparameters input_size (int): dimension of the input data Returns: (torch.nn Model) pytorch model """ if settings.model == "vanilla": rnn = vanilla_rnn.VanillaRNN elif settings.model == "variational": rnn = variational_rnn.VariationalRNN elif settings.model == "bayesian": rnn = bayesian_rnn.BayesianRNN elif settings.model == "bayesian_2": rnn = bayesian_rnn_2.BayesianRNN rnn = rnn(input_size, settings) if not settings.no_dump: print(rnn) return rnn
[docs]def get_optimizer(settings, model): """Create gradient descent optimizer Args: settings (ExperimentSettings): controls experiment hyperparameters model (torch.nn Model): the pytorch model Returns: (torch.optim) the gradient descent optimizer """ optimizer = torch.optim.Adam( model.parameters(), lr=settings.learning_rate, weight_decay=settings.weight_decay, ) return optimizer
[docs]def get_data_batch(list_data, idxs, settings, max_lengths=None, OOD=None): """Create a batch in a deterministic way Args: list_data: (list) tuples of (X, target, lightcurve_ID) idxs: (array / list) indices of batch element in list_data settings (ExperimentSettings): controls experiment hyperparameters max_length (int): Maximum light curve length to be used Default: ``None``. OOD (str): Whether to modify data to create out of distribution data to be used Default: ``None``. Returns: Tuple containing - packed_tensor (torch PackedSequence): the packed features - X_tensor (torch Tensor): the features - target_tensor (torch Tensor): the target """ list_len = [] list_batch = [] for pos, i in enumerate(idxs): X, target, *_ = list_data[i] # X is (L, D) if OOD is not None: # Make a copy to be sure we do not alter the original data X = X.copy() if OOD == "reverse": # For OOD test, reverse the sequence X = np.ascontiguousarray(X[::-1]) elif OOD == "shuffle": # For OOD test, shuffle X p = np.random.permutation(X.shape[0]) X = X[p] elif OOD == "sin": # For OOD test, set sine values to fluxes arr_flux = X[:, settings.idx_flux] arr_fluxerr = X[:, settings.idx_fluxerr] X_unnorm = unnormalize_arr(X.copy(), settings) arr_delta_time = X_unnorm[:, settings.idx_delta_time] arr_MJD = np.cumsum(arr_delta_time, axis=0) # Sine oscillations with 30 day period X[:, settings.idx_flux] = np.sin(arr_MJD * 2 * np.pi / 30) * np.max( arr_flux, axis=0, keepdims=True ) X[:, settings.idx_fluxerr] = np.random.uniform( arr_fluxerr.min(), arr_fluxerr.max(), size=arr_fluxerr.shape ) elif OOD == "random": # For OOD test, set random fluxes and errors arr_flux = X[:, settings.idx_flux] arr_fluxerr = X[:, settings.idx_fluxerr] X[:, settings.idx_flux] = np.random.uniform( arr_flux.min(), arr_flux.max(), size=arr_flux.shape ) X[:, settings.idx_fluxerr] = np.random.uniform( arr_fluxerr.min(), arr_fluxerr.max(), size=arr_fluxerr.shape ) if max_lengths is not None: assert settings.random_length is False assert settings.random_redshift is False X = X[: max_lengths[pos]] if settings.random_length: random_length = np.random.randint(3, X.shape[0] + 1) X = X[:random_length] if settings.redshift == "zspe" and settings.random_redshift: if np.random.binomial(1, 0.5) == 0: X[:, settings.idx_specz] = -1 input_dim = X.shape[1] list_len.append(X.shape[0]) list_batch.append((X, target)) # Get indices to sort the batch by sequence size (needed to use packed sequences in pytorch) # Sequences should be arranged in decreasing length idx_sort = np.argsort(list_len)[::-1] idxs_rev_sort = np.argsort(idx_sort) # these indices revert the sort max_len = list_len[idx_sort[0]] X_tensor = torch.zeros((max_len, len(idxs), input_dim)) list_target = [] lengths = [] # Assign values for the tensor for i, idx in enumerate(idx_sort): X, target = list_batch[idx] try: X_tensor[: X.shape[0], i, :] = torch.FloatTensor(X) except Exception: X_tensor[: X.shape[0], i, :] = torch.FloatTensor( torch.from_numpy(np.flip(X, axis=0).copy().astype(np.float32)) ) list_target.append(target) lengths.append(list_len[idx]) # Move data to GPU if required if settings.use_cuda: X_tensor = X_tensor.cuda() target_tensor = torch.LongTensor(list_target).cuda() else: X_tensor = X_tensor target_tensor = torch.LongTensor(list_target) # Create a packed sequence packed_tensor = nn.utils.rnn.pack_padded_sequence(X_tensor, lengths) return packed_tensor, X_tensor, target_tensor, idxs_rev_sort
[docs]def train_step( settings, rnn, packed_tensor, target_tensor, criterion, optimizer, batch_size, num_batches, ): """Full training step : Forward and Backward pass Args: settings (ExperimentSettings): controls experiment hyperparameters rnn (torch.nn Model): pytorch model to train packed_tensor (torch PackedSequence): input tensor in packed form target_tensor (torch Tensor): target tensor criterion (torch loss function): loss function to optimize optimizer (torch optim): the gradient descent optimizer batch_size (int): batch size num_batches (int): number of minibatches to scale KL cost in Bayesian """ # Set NN to train mode (deals with dropout and batchnorm) rnn.train() # Zero out the gradients optimizer.zero_grad() # Forward pass output = rnn(packed_tensor) loss = criterion(output.squeeze(), target_tensor) # Special case for BayesianRNN, need to use KL loss if isinstance(rnn, bayesian_rnn.BayesianRNN): loss = loss + rnn.kl / (num_batches * batch_size) else: loss = criterion(output.squeeze(), target_tensor) # Backward pass loss.backward() optimizer.step() return loss
[docs]def eval_step(rnn, packed_tensor, batch_size): """Eval step: Forward pass only Args: rnn (torch.nn Model): pytorch model to train packed_tensor (torch PackedSequence): input tensor in packed form batch_size (int): batch size Returns: output (torch Tensor): output of rnn """ # Set NN to eval mode (deals with dropout and batchnorm) rnn.eval() # Forward pass output = rnn(packed_tensor) return output
[docs]def plot_loss(d_train, d_val, epoch, settings): """Plot loss curves Plot training and validation logloss Args: d_train (dict of arrays): training log losses d_val (dict of arrays): validation log losses epoch (int): current epoch settings (ExperimentSettings): custom class to hold hyperparameters """ for key in d_train.keys(): plt.figure() plt.plot(d_train["epoch"], d_train[key], label="Train %s" % key.title()) plt.plot(d_val["epoch"], d_val[key], label="Val %s" % key.title()) plt.legend(loc="best", fontsize=18) plt.xlabel("Step", fontsize=22) plt.tight_layout() plt.savefig( Path(settings.models_dir) / f"{settings.pytorch_model_name}" / f"train_and_val_{key}_{settings.pytorch_model_name}.png" ) plt.close() plt.clf()
[docs]def get_evaluation_metrics(settings, list_data, model, sample_size=None): """Compute evaluation metrics on a list of data points Args: settings (ExperimentSettings): custom class to hold hyperparameters list_data (list): contains data to evaluate model (torch.nn Model): pytorch model sample_size (int): subset of the data to use for validation. Default: ``None`` Returns: d_losses (dict) maps metrics to their computed value """ # Validate list_pred = [] list_target = [] list_kl = [] num_elem = len(list_data) num_batches = num_elem // min(num_elem // 2, settings.batch_size) list_batches = np.array_split(np.arange(num_elem), num_batches) # If required, pick a subset of list batches at random if sample_size: batch_idxs = np.random.permutation(len(list_batches)) num_batches = sample_size // min(sample_size // 2, settings.batch_size) batch_idxs = batch_idxs[:num_batches] list_batches = [list_batches[batch_idx] for batch_idx in batch_idxs] for batch_idxs in list_batches: random_length = settings.random_length settings.random_length = False packed_tensor, X_tensor, target_tensor, idxs_rev_sort = get_data_batch( list_data, batch_idxs, settings ) settings.random_length = random_length output = eval_step(model, packed_tensor, X_tensor.size(1)) if "bayesian" in settings.pytorch_model_name: list_kl.append(model.kl.detach().cpu().item()) # Apply softmax pred_proba = nn.functional.softmax(output, dim=1) # Convert to numpy array pred_proba = pred_proba.data.cpu().numpy() target_numpy = target_tensor.data.cpu().numpy() # Revert sort pred_proba = pred_proba[idxs_rev_sort] target_numpy = target_numpy[idxs_rev_sort] list_pred.append(pred_proba) list_target.append(target_numpy) targets = np.concatenate(list_target, axis=0) preds = np.concatenate(list_pred, axis=0) # Check outputs size assert len(targets.shape) == 1 assert len(preds.shape) == 2 if settings.nb_classes == 2 and len(set(targets)) > 1: auc = metrics.roc_auc_score(targets, preds[:, 1]) else: # Can't compute AUC for more than 2 classes auc = None acc = metrics.accuracy_score(targets, np.argmax(preds, 1)) targets_2D = np.zeros((targets.shape[0], settings.nb_classes)) for i in range(targets.shape[0]): targets_2D[i, targets[i]] = 1 log_loss = metrics.log_loss(targets_2D, preds) d_losses = {"AUC": auc, "Acc": acc, "loss": log_loss} if len(list_kl) != 0: d_losses["KL"] = np.mean(list_kl) return d_losses
[docs]def get_loss_string(d_losses_train, d_losses_val): """Obtain a loss string to display training progress Args: d_losses_train (dict): maps {metric:value} for the training data d_losses_val (dict): maps {metric:value} for the validation data Returns: loss_str (str): the loss string to display """ loss_str = "/".join(d_losses_train.keys()) loss_str += " [T]: " + "/".join( [ f"{value:.3g}" if (value is not None and key != "epoch") else "NA" for (key, value) in d_losses_train.items() ] ) loss_str += " [V]: " + "/".join( [ f"{value:.3g}" if (value is not None and key != "epoch") else "NA" for (key, value) in d_losses_val.items() ] ) return loss_str
[docs]def save_training_results(settings, d_monitor, training_time): """Obtain a loss string to display training progress Args: settings (ExperimentSettings): controls experiment hyperparameters d_monitor (dict): maps {metric:value} training_time (float): amount of time training took Returns: loss_str (str): the loss string to display """ d_results = {"training_time": training_time} for key in ["AUC", "Acc"]: if key == "AUC" and settings.nb_classes > 2: d_results[key] = -1 else: d_results[key] = max(d_monitor[key]) if None not in d_monitor[key] else None d_results["loss"] = min(d_monitor["loss"]) d_results["data_types_training"] = np.array2string(settings.data_types_training) try: with open(Path(settings.rnn_dir) / "training_log.json", "r") as f: d_out = json.load(f) except Exception: d_out = {} with open(Path(settings.rnn_dir) / "training_log.json", "w") as f: d_out.update({settings.pytorch_model_name: d_results}) json.dump(d_out, f)
####################### # RandomForest Utils #######################
[docs]def save_randomforest_model(settings, clf): """Save RandomForest model Args: settings (ExperimentSettings): controls experiment hyperparameters clf (RandomForestClassifier): RandomForest model """ filename = f"{settings.rf_dir}/{settings.randomforest_model_name}.pickle" with open(filename, "wb") as f: pickle.dump(clf, f) lu.print_green("Saved model")
[docs]def load_randomforest_model(settings, model_file=None): """Load RandomForest model Args: settings (ExperimentSettings): controls experiment hyperparameters model_file (str): path to saved randomforest model. Default: ``None`` Returns: (RandomForestClassifier) RandomForest model """ if model_file is None: model_file = f"{settings.rf_dir}/{settings.randomforest_model_name}.pickle" assert os.path.isfile(model_file) with open(model_file, "rb") as f: clf = pickle.load(f) lu.print_green("Loaded model") return clf
[docs]def train_and_evaluate_randomforest_model(clf, X_train, y_train, X_val, y_val): """Train a RandomForestClassifier and evaluate AUC, precision, accuracy on a validation set Args: clf (RandomForestClassifier): RandomForest model to fit and evaluate X_train (np.array): the training features y_train (np.array): the training target X_val (np.array): the validation features y_val (np.array): the validation target """ lu.print_green("Fitting RandomForest...") clf = clf.fit(X_train, y_train) lu.print_green("Fitting complete") # Evaluate our classifier probas_ = clf.predict_proba(X_val) # Compute AUC and precision fpr, tpr, thresholds = metrics.roc_curve(y_val, probas_[:, 1]) roc_auc = metrics.auc(fpr, tpr) pscore = metrics.precision_score(y_val, clf.predict(X_val), average="binary") lu.print_green("Validation AUC", roc_auc) lu.print_green("Validation precision score", pscore) lu.print_green( "Train data accuracy", 100 * (sum(clf.predict(X_train) == y_train)) / X_train.shape[0], ) lu.print_green( "Val data accuracy", 100 * (sum(clf.predict(X_val) == y_val)) / X_val.shape[0] ) return clf
[docs]class StopOnPlateau(object): """ Detect plateau on accuracy (or any metric) If chosen, will reduce learning rate of optimizer once in the Plateau .. code: python plateau_accuracy = tu.StopOnPlateau() for epoch in range(10): ... get metric ... plateau = plateau_accuracy.step(metric_value) if plateau is True: break Args: patience (int): number of epochs to wait, after which we decrease the LR if the validation loss is plateauing reduce_lr-on_plateau (bool): If True, reduce LR after loss has not improved in the last patience epochs max_learning_rate_reduction (float): max factor by which to reduce the learning rate """ def __init__( self, patience=10, reduce_lr_on_plateau=False, max_learning_rate_reduction=3 ): self.patience = patience self.best = 0.0 self.num_bad_epochs = 0 self.is_better = None self.last_epoch = -1 self.list_metric = [] self.reduce_lr_on_plateau = reduce_lr_on_plateau self.max_learning_rate_reduction = max_learning_rate_reduction self.learning_rate_reduction = 0 def step(self, metric_value, optimizer=None, epoch=None): current = metric_value if epoch is None: epoch = self.last_epoch = self.last_epoch + 1 self.last_epoch = epoch # Are we under .05 std in accuracy on the last 10 epochs self.list_metric.append(current) if len(self.list_metric) > 10: self.list_metric = self.list_metric[-10:] # are we in a plateau? # accuracy is not in percentage, so two decimal numbers is actually 4 in this notation if np.array(self.list_metric).std() < 0.0005: print("Has reached a learning plateau with", current, "\n") if optimizer is not None and self.reduce_lr_on_plateau is True: print( "Reducing learning rate by factor of ten", self.learning_rate_reduction, "\n", ) for param in optimizer.param_groups: param["lr"] = param["lr"] / 10.0 self.learning_rate_reduction += 1 if self.learning_rate_reduction == self.max_learning_rate_reduction: return True else: return True else: return False