Source code for supernnova.utils.experiment_settings

import os
import json
import h5py
import itertools
import numpy as np
from pathlib import Path


[docs]class ExperimentSettings: """Mother class to control experiment parameters This class is responsible for the following - Defining paths and model names - Choosing the device on which to run computations - Specifying all hyperparameters such as model configuration, datasets, features etc Args: cli_args (argparse.Namespace) command line arguments """ def __init__(self, cli_args): # Transfer attributes if isinstance(cli_args, dict): self.__dict__.update(cli_args) self.cli_args = cli_args else: self.__dict__.update(cli_args.__dict__) self.cli_args = cli_args.__dict__ self.device = "cpu" if self.use_cuda: self.device = "cuda" if self.model == "variational": self.weight_decay = self.weight_decay else: self.weight_decay = 0.0 # Load simulation and training settings and prepare directories if self.no_dump: pass else: self.setup_dir() # Set the database file names self.set_database_file_names() self.randomforest_features = self.get_randomforest_features() # Set the feature lists if "all_features" not in cli_args: self.set_feature_lists() self.overwrite = not self.no_overwrite # filter combination list_filters_combination = [] for i in range(1, len(self.list_filters) + 1): tmp = [ "".join(t) for t in list(itertools.combinations(self.list_filters, i)) ] list_filters_combination = list_filters_combination + tmp self.list_filters_combination = list_filters_combination self.set_randomforest_model_name() self.set_pytorch_model_name() # Get the feature normalization dict self.load_normalization()
[docs] def get_randomforest_features(self): """Specify list of features to be used for RandomForest training""" features = [ "x1", "x1ERR", "c", "cERR", "mB", "mBERR", "x0", "x0ERR", # 'COV_x1_c', 'COV_x1_x0','COV_c_x0', 'NDOF', "FITCHI2", "m0obs_r", "m0obs_i", "m0obs_g", "m0obs_z", "em0obs_i", "em0obs_r", "em0obs_g", "em0obs_z", ] if self.redshift == "zpho": features += ["HOSTGAL_PHOTOZ", "HOSTGAL_PHOTOZ_ERR"] elif self.redshift == "zspe": features += ["HOSTGAL_SPECZ", "HOSTGAL_SPECZ_ERR"] return features
[docs] def setup_dir(self): """Configure directories where data is read from or dumped to during the course of an experiment """ for path in [ # f"{self.raw_dir}", # f"{self.fits_dir}", f"{self.dump_dir}/explore", f"{self.dump_dir}/stats", f"{self.dump_dir}/figures", f"{self.dump_dir}/lightcurves", f"{self.dump_dir}/latex", f"{self.dump_dir}/processed", f"{self.dump_dir}/preprocessed", f"{self.dump_dir}/models", ]: setattr(self, Path(path).name + "_dir", path) Path(path).mkdir(exist_ok=True, parents=True)
[docs] def set_pytorch_model_name(self): """Define the model name for all NN based classifiers""" name = f"{self.model}_S_{self.seed}_CLF_{self.nb_classes}" name += f"_R_{self.redshift}" name += f"_{self.source_data}_DF_{self.data_fraction}_N_{self.norm}" name += f"_{self.layer_type}_{self.hidden_dim}x{self.num_layers}" name += f"_{self.dropout}" name += f"_{self.batch_size}" name += f"_{self.bidirectional}" name += f"_{self.rnn_output_option}" if "bayesian" in self.model: name += ( f"_Bayes_{self.pi}_{self.log_sigma1}_{self.log_sigma2}" f"_{self.rho_scale_lower}_{self.rho_scale_upper}" f"_{self.log_sigma1_output}_{self.log_sigma2_output}" f"_{self.rho_scale_lower_output}_{self.rho_scale_upper_output}" ) if self.cyclic: name += "_C" if self.weight_decay > 0: name += f"_WD_{self.weight_decay}" self.pytorch_model_name = name self.rnn_dir = f"{self.models_dir}/{self.pytorch_model_name}" # deserializing numpy arrays to save as json d_tmp = {} for k, v in self.__dict__.items(): if isinstance(v, np.ndarray): v = v.tolist() d_tmp[k] = v if self.train_rnn: os.makedirs(self.rnn_dir, exist_ok=True) # Dump the command line arguments (for model restoration) with open(Path(self.rnn_dir) / "cli_args.json", "w") as f: json.dump(d_tmp, f, indent=4, sort_keys=True)
[docs] def set_randomforest_model_name(self): """Define the model name for all RandomForest based classifiers""" name = f"randomforest_S_{self.seed}_CLF_{self.nb_classes}" name += f"_R_{self.redshift}" name += f"_{self.source_data}_DF_{self.data_fraction}_N_{self.norm}" self.randomforest_model_name = name self.rf_dir = f"{self.models_dir}/{self.randomforest_model_name}" if self.train_rf: os.makedirs(self.rf_dir, exist_ok=True) # Dump the command line arguments (for model restoration) with open(Path(self.rf_dir) / "cli_args.json", "w") as f: json.dump(self.cli_args, f, indent=4, sort_keys=True) return name
[docs] def check_data_exists(self): """Utility to check the database has been built""" database_file = f"{self.processed_dir}/database.h5" assert os.path.isfile(database_file)
[docs] def set_feature_lists(self): """Utility to define the features used to train NN=based models""" self.training_features_to_normalize = [ f"FLUXCAL_{f}" for f in self.list_filters ] self.training_features_to_normalize += [ f"FLUXCALERR_{f}" for f in self.list_filters ] self.training_features_to_normalize += ["delta_time"] if not self.data: # If the database has been created, add the list of all features with h5py.File(self.hdf5_file_name, "r") as hf: self.all_features = hf["features"][:].astype(str) self.non_redshift_features = [ f for f in self.all_features if "HOSTGAL" not in f ] # Optionally add redshift self.redshift_features = [] if self.redshift == "zpho": self.redshift_features = [ f for f in self.all_features if "HOSTGAL_PHOTOZ" in f ] elif self.redshift == "zspe": self.redshift_features = [ f for f in self.all_features if "HOSTGAL_SPECZ" in f ] self.training_features = ( self.non_redshift_features + self.redshift_features ) if self.additional_train_var: self.training_features += [ k for k in self.additional_train_var if k not in self.training_features ]
[docs] def set_database_file_names(self): """Create a unique database name based on the dataset required by the settings """ out_file = f"{self.processed_dir}/database" self.pickle_file_name = out_file + ".pickle" self.hdf5_file_name = out_file + ".h5"
[docs] def load_normalization(self): """Create an array holding the data-normalization parameters used to normalize certain features in the NN-based classification pipeline """ if not self.data: self.idx_features = [ i for (i, f) in enumerate(self.all_features) if f in self.training_features ] self.idx_specz = [ i for (i, f) in enumerate(self.training_features) if "HOSTGAL_SPECZ" in f ] self.idx_flux = [ i for (i, f) in enumerate(self.training_features) if "FLUXCAL_" in f ] self.idx_fluxerr = [ i for (i, f) in enumerate(self.training_features) if "FLUXCALERR_" in f ] self.idx_delta_time = [ i for (i, f) in enumerate(self.training_features) if "delta_time" in f ] self.idx_features_to_normalize = [ i for (i, f) in enumerate(self.all_features) if f in self.training_features_to_normalize ] self.d_feat_to_idx = {f: i for i, f in enumerate(self.all_features)} list_norm = [] with h5py.File(self.hdf5_file_name, "r") as hf: for f in self.training_features_to_normalize: if self.norm == "perfilter": minv = np.array(hf[f"normalizations/{f}/min"]) meanv = np.array(hf[f"normalizations/{f}/mean"]) stdv = np.array(hf[f"normalizations/{f}/std"]) list_norm.append([minv, meanv, stdv]) else: if "FLUX" in f: prefix = f.split("_")[0] minv = np.array(hf[f"normalizations_global/{prefix}/min"]) meanv = np.array(hf[f"normalizations_global/{prefix}/mean"]) stdv = np.array(hf[f"normalizations_global/{prefix}/std"]) else: minv = np.array(hf[f"normalizations/{f}/min"]) meanv = np.array(hf[f"normalizations/{f}/mean"]) stdv = np.array(hf[f"normalizations/{f}/std"]) list_norm.append([minv, meanv, stdv]) self.arr_norm = np.array(list_norm)