from __future__ import annotations
import datetime
import logging
from dataclasses import dataclass
from time import perf_counter
from typing import Callable, ClassVar, Optional
import joblib # type: ignore
import numpy as np
import optuna
from optuna.visualization import (
plot_parallel_coordinate,
plot_param_importances,
plot_pareto_front,
)
from .data_management import Residuals
from .model import Hyperparameters, Model
from .neural_network import best_trial_under_n
[docs]class HyperparameterOptimization:
"""Manager for the optimization of the hyperparameters
corresponding to a certain :class:`Model`.
The optimization performed is over two variables:
the reconstruction accuracy and the training time.
**Reconstruction accuracy** is quantified by looking at the average
square error in the reconstruction of the residuals;
spefifically, the average is taken over both amplitude and phase residuals,
and the value returned by the :meth:`objective` function is the base-10
logarithm of this.
**Training time** accounts for both the time required to train the
neural network and the estimated time required to generate the waveforms
needed for the training.
This can vary, since one of the hyperparameters varied in the training
is the number of waveforms in the training dataset.
Including it is convenient since having more waveforms ---
a finer sampling of the waveform space ---
means the optimal network might be different.
However, only ever training networks with as large a number of waveforms
as we might wish to use in the end gets expensive;
therefore, we vary the number of training waveforms in the optimization,
so that the optuna study is able to learn the basic region of parameter space
which it is best to explore, and then extend that knowledge to the
new region of the parameter space with more training waveforms.
The inclusion of this cost term is needed since, typically,
using more waveforms will yield a better fit.
So, we do multi-parameter optimization: see, for example,
`Multiobjective tree-structured parzen estimator
for computationally expensive optimization problems <https://doi.org/10.1145/3377930.3389817>`_
by Ozaki et al.
To visualize the Pareto front of the optimization, one can use the
:meth:`plot_pareto` method after an optimization run.
Parameters
----------
model: Model
Reference model for the optimization.
optimization_seed: int, optional
Seed for the random number to be used in the optimization.
Defaults to 42.
hyper_validation_fraction: float
Fraction of the data to be used in validation
during the optimization.
study: optuna.Study, optional
Pre-made study to use.
Defaults to None; if not provided,
the initializer looks for a file with the correct name
in the local directory and uses it,
and it creates a new study if it cannot find it.
Class Attributes
waveform_gen_time: float
Reference generation time for a single waveform,
to be used in the computation of the effective time
in the :meth:`objective`.
Defaults to 0.1.
save_every_n_minutes: float
When running the optimization through :meth:`optimize`,
every how many minutes to save the study.
Defaults to 30.
"""
waveform_gen_time: float = 0.1
save_every_n_minutes: float = 30.0
def __init__(
self,
model: Model,
optimization_seed: int = 42,
hyper_validation_fraction: float = 0.1,
study: Optional[optuna.Study] = None,
):
assert model.auxiliary_data_available
assert model.training_dataset_available
self.model = model
self.rng = np.random.default_rng(seed=optimization_seed)
self.hyper_validation_fraction = hyper_validation_fraction
if study is None:
try:
self.study: optuna.Study = joblib.load(self.study_filename)
logging.info("Loading study from %s", self.study_filename)
except FileNotFoundError:
self.study = optuna.create_study(
directions=["minimize", "minimize"], study_name=self.model.filename
)
logging.info("Creating new study")
else:
self.study = study
@property
def training_data_number(self) -> int:
"""Number of available training waveforms."""
assert self.model.training_dataset is not None
return len(self.model.training_dataset)
@property
def study_filename(self) -> str:
"""Name of the file to save the study to."""
return f"{self.model.filename}_study.pkl"
[docs] def objective(
self,
trial: optuna.Trial,
) -> tuple[float, float]:
"""Objective function to be used when optimizing the hyperparameters
for the neural network and PCA.
Parameters
----------
trial : optuna.Trial
This object is required to generate the parameters
according to the rules of the :module:``optuna`` optimizer used.
Returns
-------
tuple[float, float]
Base-10 logarithm of the accuracy and training time, respectively.
The accuracy is defined as the average of the square differences between
the true and estimated residuals.
The training time includes both the training of the network and,
roughly, the generation of the waveforms used for training.
"""
assert self.model.training_dataset is not None
assert self.model.training_parameters is not None
# train network on a subset of the data
validation_data_number = int(
self.hyper_validation_fraction * self.training_data_number
)
hyper = Hyperparameters.from_trial(
trial, n_train_max=self.training_data_number - validation_data_number
)
assert hyper.n_train + validation_data_number <= self.training_data_number
shuffled_indices = self.rng.choice(
self.training_data_number, self.training_data_number, replace=False
)
training_indices = shuffled_indices[: hyper.n_train]
validation_indices = shuffled_indices[-validation_data_number:]
start_time = perf_counter()
nn = self.model.train_nn(hyper, list(training_indices))
end_time = perf_counter()
effective_time = (
end_time - start_time
) + self.waveform_gen_time * hyper.n_train
# validate on another subset of the data
predicted_residuals = self.model.predict_residuals_bulk(
self.model.training_parameters[validation_indices], nn
)
accuracy = self.residuals_difference(
self.model.training_dataset[list(validation_indices)], predicted_residuals
)
return np.log10(accuracy), np.log10(effective_time)
[docs] @staticmethod
def residuals_difference(residuals_1: Residuals, residuals_2: Residuals) -> float:
"""Compare two sets of :class:`Residuals`.
Parameters
----------
residuals_1 : Residuals
First set of residuals to be compared.
residuals_2 : Residuals
Second set of residuals to be compared.
Returns
-------
float
The average square-difference between the two residual sets.
"""
amp_square_differences = (
np.abs(residuals_1.amplitude_residuals - residuals_2.amplitude_residuals)
** 2
)
phi_square_differences = (
np.abs(residuals_1.phase_residuals - residuals_2.phase_residuals) ** 2
)
return (
np.average(amp_square_differences) + np.average(phi_square_differences)
) / 2.0
[docs] def optimize(self, timeout_min: float = 5.0) -> None:
"""Run the optimization ---
this is a wrapper around :meth:`optuna.Study.optimize` ---
for a certain amount of minutes.
Parameters
----------
timeout_min : float, optional
Number of minutes to run for, by default 5
"""
obj = lambda trial: self.objective(trial)
self.study.optimize(obj, timeout=timeout_min * 60)
[docs] def optimize_and_save(self, timeout_hr: float = 1.0) -> None:
"""Run the optimization ---
this is a wrapper around :meth:`optuna.Study.optimize`.
This command can take an arbitrary amount of time, therefore
its timeout is provided as a parameter.
Typically, good results can be achieved within a few hours.
The interval between which to save is determined by the class attribute
:attr:`save_every_n_minutes`.
Parameters
----------
timeout_hr : float, optional
Number of hours to run for, by default 1.
"""
iterations: int = max(int(timeout_hr * 60 / self.save_every_n_minutes), 1)
expected_datetime_end = datetime.datetime.now() + datetime.timedelta(
hours=timeout_hr
)
logging.info(
"Starting to train at %s, will end at %s",
(datetime.datetime.now(), expected_datetime_end.isoformat()),
)
for n in range(iterations):
remaining_minutes: float = (
expected_datetime_end - datetime.datetime.now()
) / datetime.timedelta(minutes=1)
if remaining_minutes <= 0:
return
iterations_left: int = iterations - n
timeout_min: float = remaining_minutes / iterations_left
self.optimize(timeout_min=timeout_min)
joblib.dump(self.study, self.study_filename)
logging.info("Saved to file.")
[docs] def plot_pareto(self) -> None:
"""Plot the Pareto front of the bivariate optimization,
making use of the function :func:`optuna.visualization.plot_pareto_front`."""
fig = plot_pareto_front(
self.study,
target_names=[
"Error [log10(average square error)] ",
"time [log10(time in seconds)]",
],
)
fig.show()
def plot_parallel(self, **kwargs):
to_plot = lambda trial: trial.values[0]
fig = plot_parallel_coordinate(self.study, target_name=to_plot, **kwargs)
fig.show()
def plot_param_importance(self):
fig = plot_param_importances(
self.study, target=lambda t: t.values[0], target_name="Error"
)
fig.show()
[docs] def best_hyperparameters(
self, training_number: Optional[int] = None
) -> Hyperparameters:
"""Return the best hyperparameters found using less than
a certain number of training waveforms.
Parameters
----------
training_number : int, optional
Number of training waveforms; by default None,
in which case return the hyperparameters
for as many waveforms as the current model has available.
Returns
-------
Hyperparameters
"""
best_trials = self.study.best_trials
if training_number is None:
training_number = self.training_data_number
return best_trial_under_n(best_trials, training_number)
[docs] def save_best_trials_to_file(self, filename: str = "best_trials") -> None:
"""Save the best trials obtained so far in the optimization to the file
"filename".pkl.
The best trials are obtained as ``self.study.best_trials``.
Parameters
----------
filename : str, optional
Filename to save to, by default "best_trials"
"""
joblib.dump(self.study.best_trials, f"{filename}.pkl")
def total_training_time(self) -> datetime.timedelta:
return sum(
((t.datetime_complete - t.datetime_start) for t in self.study.trials), # type: ignore
datetime.timedelta(),
)
# Trial.datetime_complete (and _start) are defined as optional in the
# FrozenTrial type, but here they will always be set