example_opt_root/hyperopt_optimizer.py (122 lines of code) (raw):
import numpy as np
from hyperopt import hp, tpe
from hyperopt.base import JOB_STATE_DONE, JOB_STATE_NEW, STATUS_OK, Domain, Trials
from scipy.interpolate import interp1d
from bayesmark.abstract_optimizer import AbstractOptimizer
from bayesmark.experiment import experiment_main
from bayesmark.np_util import random as np_random
from bayesmark.np_util import random_seed
# Sklearn prefers str to unicode:
DTYPE_MAP = {"real": float, "int": int, "bool": bool, "cat": str, "ordinal": str}
def dummy_f(x):
assert False, "This is a placeholder, it should never be called."
def only(x):
y, = x
return y
class HyperoptOptimizer(AbstractOptimizer):
primary_import = "hyperopt"
def __init__(self, api_config, random=np_random):
"""Build wrapper class to use hyperopt optimizer in benchmark.
Parameters
----------
api_config : dict-like of dict-like
Configuration of the optimization variables. See API description.
"""
AbstractOptimizer.__init__(self, api_config)
self.random = random
space, self.round_to_values = HyperoptOptimizer.get_hyperopt_dimensions(api_config)
self.domain = Domain(dummy_f, space, pass_expr_memo_ctrl=None)
self.trials = Trials()
# Some book keeping like opentuner wrapper
self.trial_id_lookup = {}
# Store just for data validation
self.param_set_chk = frozenset(api_config.keys())
@staticmethod
def hashable_dict(d):
"""A custom function for hashing dictionaries.
Parameters
----------
d : dict or dict-like
The dictionary to be converted to immutable/hashable type.
Returns
-------
hashable_object : frozenset of tuple pairs
Bijective equivalent to dict that can be hashed.
"""
hashable_object = frozenset(d.items())
return hashable_object
@staticmethod
def get_hyperopt_dimensions(api_config):
"""Help routine to setup hyperopt search space in constructor.
Take api_config as argument so this can be static.
"""
# The ordering of iteration prob makes no difference, but just to be
# safe and consistnent with space.py, I will make sorted.
param_list = sorted(api_config.keys())
space = {}
round_to_values = {}
for param_name in param_list:
param_config = api_config[param_name]
param_type = param_config["type"]
param_space = param_config.get("space", None)
param_range = param_config.get("range", None)
param_values = param_config.get("values", None)
# Some setup for case that whitelist of values is provided:
values_only_type = param_type in ("cat", "ordinal")
if (param_values is not None) and (not values_only_type):
assert param_range is None
param_values = np.unique(param_values)
param_range = (param_values[0], param_values[-1])
round_to_values[param_name] = interp1d(
param_values, param_values, kind="nearest", fill_value="extrapolate"
)
if param_type == "int":
low, high = param_range
if param_space in ("log", "logit"):
space[param_name] = hp.qloguniform(param_name, np.log(low), np.log(high), 1)
else:
space[param_name] = hp.quniform(param_name, low, high, 1)
elif param_type == "bool":
assert param_range is None
assert param_values is None
space[param_name] = hp.choice(param_name, (False, True))
elif param_type in ("cat", "ordinal"):
assert param_range is None
space[param_name] = hp.choice(param_name, param_values)
elif param_type == "real":
low, high = param_range
if param_space in ("log", "logit"):
space[param_name] = hp.loguniform(param_name, np.log(low), np.log(high))
else:
space[param_name] = hp.uniform(param_name, low, high)
else:
assert False, "type %s not handled in API" % param_type
return space, round_to_values
def get_trial(self, trial_id):
for trial in self.trials._dynamic_trials:
if trial["tid"] == trial_id:
assert isinstance(trial, dict)
# Make sure right kind of dict
assert "state" in trial and "result" in trial
assert trial["state"] == JOB_STATE_NEW
return trial
assert False, "No matching trial ID"
def cleanup_guess(self, x_guess):
assert isinstance(x_guess, dict)
# Also, check the keys are only the vars we are searching over:
assert frozenset(x_guess.keys()) == self.param_set_chk
# Do the rounding
# Make a copy to be safe, and also unpack singletons
# We may also need to consider clip_chk at some point like opentuner
x_guess = {k: only(x_guess[k]) for k in x_guess}
for param_name, round_f in self.round_to_values.items():
x_guess[param_name] = round_f(x_guess[param_name])
# Also ensure this is correct dtype so sklearn is happy
x_guess = {k: DTYPE_MAP[self.api_config[k]["type"]](x_guess[k]) for k in x_guess}
return x_guess
def _suggest(self):
"""Helper function to `suggest` that does the work of calling
`hyperopt` via its dumb API.
"""
new_ids = self.trials.new_trial_ids(1)
assert len(new_ids) == 1
self.trials.refresh()
seed = random_seed(self.random)
new_trials = tpe.suggest(new_ids, self.domain, self.trials, seed)
assert len(new_trials) == 1
self.trials.insert_trial_docs(new_trials)
self.trials.refresh()
new_trial, = new_trials # extract singleton
return new_trial
def suggest(self, n_suggestions=1):
"""Make `n_suggestions` suggestions for what to evaluate next.
This requires the user observe all previous suggestions before calling
again.
Parameters
----------
n_suggestions : int
The number of suggestions to return.
Returns
-------
next_guess : list of dict
List of `n_suggestions` suggestions to evaluate the objective
function. Each suggestion is a dictionary where each key
corresponds to a parameter being optimized.
"""
assert n_suggestions >= 1, "invalid value for n_suggestions"
# Get the new trials, it seems hyperopt either uses random search or
# guesses one at a time anyway, so we might as welll call serially.
new_trials = [self._suggest() for _ in range(n_suggestions)]
X = []
for trial in new_trials:
x_guess = self.cleanup_guess(trial["misc"]["vals"])
X.append(x_guess)
# Build lookup to get original trial object
x_guess_ = HyperoptOptimizer.hashable_dict(x_guess)
assert x_guess_ not in self.trial_id_lookup, "the suggestions should not already be in the trial dict"
self.trial_id_lookup[x_guess_] = trial["tid"]
assert len(X) == n_suggestions
return X
def observe(self, X, y):
"""Feed the observations back to hyperopt.
Parameters
----------
X : list of dict-like
Places where the objective function has already been evaluated.
Each suggestion is a dictionary where each key corresponds to a
parameter being optimized.
y : array-like, shape (n,)
Corresponding values where objective has been evaluated.
"""
assert len(X) == len(y)
for x_guess, y_ in zip(X, y):
x_guess_ = HyperoptOptimizer.hashable_dict(x_guess)
assert x_guess_ in self.trial_id_lookup, "Appears to be guess that did not originate from suggest"
trial_id = self.trial_id_lookup.pop(x_guess_)
trial = self.get_trial(trial_id)
assert self.cleanup_guess(trial["misc"]["vals"]) == x_guess, "trial ID not consistent with x values stored"
# Cast to float to ensure native type
result = {"loss": float(y_), "status": STATUS_OK}
trial["state"] = JOB_STATE_DONE
trial["result"] = result
# hyperopt.fmin.FMinIter.serial_evaluate only does one refresh at end
# of loop of a bunch of evals, so we will do the same thing here.
self.trials.refresh()
if __name__ == "__main__":
experiment_main(HyperoptOptimizer)