in nni/algorithms/hpo/metis_tuner/metis_tuner.py [0:0]
def _selection(
self,
samples_x,
samples_y_aggregation,
samples_y,
x_bounds,
x_types,
max_resampling_per_x=3,
threshold_samplessize_exploitation=12,
threshold_samplessize_resampling=50,
no_candidates=False,
minimize_starting_points=None,
minimize_constraints_fun=None):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
next_candidate = None
candidates = []
samples_size_all = sum([len(i) for i in samples_y])
samples_size_unique = len(samples_y)
# ===== STEP 1: Compute the current optimum =====
gp_model = gp_create_model.create_model(
samples_x, samples_y_aggregation)
lm_current = gp_selection.selection(
"lm",
samples_y_aggregation,
x_bounds,
x_types,
gp_model['model'],
minimize_starting_points,
minimize_constraints_fun=minimize_constraints_fun)
if not lm_current:
return None
logger.info({
'hyperparameter': lm_current['hyperparameter'],
'expected_mu': lm_current['expected_mu'],
'expected_sigma': lm_current['expected_sigma'],
'reason': "exploitation_gp"
})
if no_candidates is False:
# ===== STEP 2: Get recommended configurations for exploration ====
results_exploration = gp_selection.selection(
"lc",
samples_y_aggregation,
x_bounds,
x_types,
gp_model['model'],
minimize_starting_points,
minimize_constraints_fun=minimize_constraints_fun)
if results_exploration is not None:
if _num_past_samples(results_exploration['hyperparameter'], samples_x, samples_y) == 0:
temp_candidate = {
'hyperparameter': results_exploration['hyperparameter'],
'expected_mu': results_exploration['expected_mu'],
'expected_sigma': results_exploration['expected_sigma'],
'reason': "exploration"
}
candidates.append(temp_candidate)
logger.info("DEBUG: 1 exploration candidate selected\n")
logger.info(temp_candidate)
else:
logger.info("DEBUG: No suitable exploration candidates were")
# ===== STEP 3: Get recommended configurations for exploitation ===
if samples_size_all >= threshold_samplessize_exploitation:
logger.info("Getting candidates for exploitation...\n")
try:
gmm = gmm_create_model.create_model(
samples_x, samples_y_aggregation)
if ("discrete_int" in x_types) or ("range_int" in x_types):
results_exploitation = gmm_selection.selection(
x_bounds,
x_types,
gmm['clusteringmodel_good'],
gmm['clusteringmodel_bad'],
minimize_starting_points,
minimize_constraints_fun=minimize_constraints_fun)
else:
# If all parameters are of "range_continuous",
# let's use GMM to generate random starting points
results_exploitation = gmm_selection.selection_r(
x_bounds,
x_types,
gmm['clusteringmodel_good'],
gmm['clusteringmodel_bad'],
num_starting_points=self.selection_num_starting_points,
minimize_constraints_fun=minimize_constraints_fun)
if results_exploitation is not None:
if _num_past_samples(results_exploitation['hyperparameter'], samples_x, samples_y) == 0:
temp_expected_mu, temp_expected_sigma = \
gp_prediction.predict(results_exploitation['hyperparameter'], gp_model['model'])
temp_candidate = {
'hyperparameter': results_exploitation['hyperparameter'],
'expected_mu': temp_expected_mu,
'expected_sigma': temp_expected_sigma,
'reason': "exploitation_gmm"
}
candidates.append(temp_candidate)
logger.info(
"DEBUG: 1 exploitation_gmm candidate selected\n")
logger.info(temp_candidate)
else:
logger.info(
"DEBUG: No suitable exploitation_gmm candidates were found\n")
except ValueError as exception:
# The exception: ValueError: Fitting the mixture model failed
# because some components have ill-defined empirical covariance
# (for instance caused by singleton or collapsed samples).
# Try to decrease the number of components, or increase
# reg_covar.
logger.info(
"DEBUG: No suitable exploitation_gmm \
candidates were found due to exception.")
logger.info(exception)
# ===== STEP 4: Get a list of outliers =====
if (threshold_samplessize_resampling is not None) and \
(samples_size_unique >= threshold_samplessize_resampling):
logger.info("Getting candidates for re-sampling...\n")
results_outliers = gp_outlier_detection.outlierDetection_threaded(
samples_x, samples_y_aggregation)
if results_outliers is not None:
for results_outlier in results_outliers: # pylint: disable=not-an-iterable
if _num_past_samples(samples_x[results_outlier['samples_idx']], samples_x, samples_y) < max_resampling_per_x:
temp_candidate = {'hyperparameter': samples_x[results_outlier['samples_idx']],\
'expected_mu': results_outlier['expected_mu'],\
'expected_sigma': results_outlier['expected_sigma'],\
'reason': "resampling"}
candidates.append(temp_candidate)
logger.info("DEBUG: %d re-sampling candidates selected\n")
logger.info(temp_candidate)
else:
logger.info(
"DEBUG: No suitable resampling candidates were found\n")
if candidates:
# ===== STEP 5: Compute the information gain of each candidate
logger.info(
"Evaluating information gain of %d candidates...\n")
next_improvement = 0
threads_inputs = [[
candidate, samples_x, samples_y, x_bounds, x_types,
minimize_constraints_fun, minimize_starting_points
] for candidate in candidates]
threads_pool = ThreadPool(4)
# Evaluate what would happen if we actually sample each
# candidate
threads_results = threads_pool.map(
_calculate_lowest_mu_threaded, threads_inputs)
threads_pool.close()
threads_pool.join()
for threads_result in threads_results:
if threads_result['expected_lowest_mu'] < lm_current['expected_mu']:
# Information gain
temp_improvement = threads_result['expected_lowest_mu'] - \
lm_current['expected_mu']
if next_improvement > temp_improvement:
next_improvement = temp_improvement
next_candidate = threads_result['candidate']
else:
# ===== STEP 6: If we have no candidates, randomly pick one ===
logger.info(
"DEBUG: No candidates from exploration, exploitation,\
and resampling. We will random a candidate for next_candidate\n"
)
next_candidate = _rand_with_constraints(
x_bounds,
x_types) if minimize_starting_points is None else minimize_starting_points[0]
next_candidate = lib_data.match_val_type(
next_candidate, x_bounds, x_types)
expected_mu, expected_sigma = gp_prediction.predict(
next_candidate, gp_model['model'])
next_candidate = {
'hyperparameter': next_candidate,
'reason': "random",
'expected_mu': expected_mu,
'expected_sigma': expected_sigma}
# STEP 7: If current optimal hyperparameter occurs in the history
# or exploration probability is less than the threshold, take next
# config as exploration step
outputs = self._pack_output(lm_current['hyperparameter'])
ap = random.uniform(0, 1)
if outputs in self.total_data or ap <= self.exploration_probability:
if next_candidate is not None:
outputs = self._pack_output(next_candidate['hyperparameter'])
else:
random_parameter = _rand_init(x_bounds, x_types, 1)[0]
outputs = self._pack_output(random_parameter)
self.total_data.append(outputs)
return outputs