easy_rec/python/hpo/emr_hpo.py (117 lines of code) (raw):
# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
"""Hyperparameter search for easy_rec on emr."""
import argparse
import json
import logging
import os
import shutil
import time
from pai.automl.hpo.autotuner import AutoTuner
from easy_rec.python.utils import hpo_util
file_dir, _ = os.path.split(os.path.abspath(__file__))
logging.basicConfig(
level=logging.INFO, format='[%(asctime)s][%(levelname)s] %(message)s')
def hpo_config(config_path, hyperparams, exp_dir, metric_name,
el_submit_params):
earlystop = {
'type': 'large_is_better',
'threshold': 0.99,
'max_runtime': 2400
}
algorithm = {
'type': 'gp',
'initial_trials_num': 4,
'stop_when_exception': True
}
tmp_dir = '/tmp/emr_easy_rec_hpo_%d' % time.time()
os.makedirs(tmp_dir)
logging.info('local temporary path: %s' % tmp_dir)
param_path = tmp_dir + '/rewrite_{{ trial.id }}.json'
param_path_file = 'rewrite_{{ trial.id }}.json'
model_path = '%s/trail_{{ trial.id }}' % exp_dir
metric_path = os.path.join(model_path, 'res.metric')
pre_task = {
'type': 'BashTask',
'cmd': ['hadoop', 'fs', '-mkdir', '-p', model_path]
}
adapter_task = {
'type': 'localadaptertask',
# hpo_param_path for easy_rec
'param_file': param_path,
}
el_params = [
x.strip() for x in el_submit_params.split(' ') if x.strip() != ''
]
assert len(
el_params) % 2 == 0, 'invalid number of el_submit params: %d[%s]' % (
len(el_params), str(el_params))
for i in range(0, len(el_params), 2):
assert el_params[i] in [
'-t', '-m', '-pn', '-pc', '-pg', '-pm', '-wn', '-wc', '-wm', '-wg'
]
cmd = ['el_submit'] + el_params + [
'-a', 'easy_rec_hpo', '-m', 'local', '-f', '{},train_eval.py,{}'.format(
config_path, param_path), '--interact', 'INTERACT', '-c',
'python -m easy_rec.python.train_eval --hpo_metric_save_path {} '
'--hpo_param_path {} --pipeline_config_path {} --model_dir {}'.format(
metric_path, param_path_file, config_path, model_path)
]
train_task = {
'type': 'BashTask',
'cmd': cmd,
'metric_reader': {
'type': 'hdfs_reader',
'location': metric_path,
'parser_pattern': '.*"%s": (\\d.\\d+).*' % metric_name
}
}
tasks = [pre_task, adapter_task, train_task]
data = {
'earlystop': earlystop,
'algorithm': algorithm,
'hyperparams': hyperparams,
'tasks': tasks
}
return data, tmp_dir
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--hyperparams', type=str, help='hyper parameters', default=None)
parser.add_argument(
'--config_path', type=str, help='pipeline config', default=None)
parser.add_argument(
'--exp_dir', type=str, help='hpo experiment directory', default=None)
parser.add_argument(
'--el_submit_params',
type=str,
help='el_submit parameters(-t x -m x [-pn x -pc x -pm x] -wn x -wc x -wm x -wg x)',
default='-t standalone -m local -wn 1 -wc 6 -wm 20000 -wg 1')
parser.add_argument(
'--metric_name', type=str, help='metric_name', default='auc')
parser.add_argument(
'--max_parallel',
type=int,
help='max number of trials run at the same time',
default=4)
parser.add_argument(
'--total_trial_num',
type=int,
help='total number of trials will run',
default=6)
parser.add_argument(
'--debug',
action='store_true',
help='debug mode, will keep the temporary folder')
args = parser.parse_args()
assert args.hyperparams is not None
assert args.config_path is not None
assert args.exp_dir is not None
with open(args.hyperparams, 'r') as fin:
hyperparams = json.load(fin)
data, tmp_dir = hpo_config(args.config_path, hyperparams, args.exp_dir,
args.metric_name, args.el_submit_params)
hpo_util.kill_old_proc(tmp_dir, platform='emr')
tuner = AutoTuner.create_tuner(
data, max_parallel=args.max_parallel, max_trial_num=args.total_trial_num)
tuner.fit(synchronize=True)
if not args.debug:
shutil.rmtree(tmp_dir)
else:
logging.info('temporary directory is: %s' % tmp_dir)