in easy_rec/python/main.py [0:0]
def distribute_evaluate(pipeline_config,
eval_checkpoint_path='',
eval_data_path=None,
eval_result_filename='distribute_eval_result.txt'):
"""Evaluate a EasyRec model defined in pipeline_config_path.
Evaluate the model defined in pipeline_config_path on the eval data,
the metrics will be displayed on tensorboard and saved into eval_result.txt.
Args:
pipeline_config: either EasyRecConfig path or its instance
eval_checkpoint_path: if specified, will use this model instead of
model specified by model_dir in pipeline_config_path
eval_data_path: eval data path, default use eval data in pipeline_config
could be a path or a list of paths
eval_result_filename: evaluation result metrics save path.
Returns:
A dict of evaluation metrics: the metrics are specified in
pipeline_config_path
global_step: the global step for which this evaluation was performed.
Raises:
AssertionError, if:
* pipeline_config_path does not exist
"""
pipeline_config = config_util.get_configs_from_pipeline_file(pipeline_config)
if eval_data_path is not None:
logging.info('Evaluating on data: %s' % eval_data_path)
set_eval_input_path(pipeline_config, eval_data_path)
train_config = pipeline_config.train_config
eval_data = get_eval_input_path(pipeline_config)
data_config = pipeline_config.data_config
if data_config.HasField('sampler'):
logging.warning(
'It is not accuracy to use eval with negative sampler, recommand to use hitrate.py!'
)
eval_result = {}
return eval_result
model_dir = get_model_dir_path(pipeline_config)
eval_tmp_results_dir = os.path.join(model_dir, 'distribute_eval_tmp_results')
if not gfile.IsDirectory(eval_tmp_results_dir):
logging.info('create eval tmp results dir {}'.format(eval_tmp_results_dir))
gfile.MakeDirs(eval_tmp_results_dir)
assert gfile.IsDirectory(
eval_tmp_results_dir), 'tmp results dir not create success.'
os.environ['eval_tmp_results_dir'] = eval_tmp_results_dir
server_target = None
cur_job_name = None
if 'TF_CONFIG' in os.environ:
tf_config = estimator_utils.chief_to_master()
from tensorflow.python.training import server_lib
if tf_config['task']['type'] == 'ps':
cluster = tf.train.ClusterSpec(tf_config['cluster'])
server = server_lib.Server(
cluster, job_name='ps', task_index=tf_config['task']['index'])
server.join()
elif tf_config['task']['type'] == 'master':
if 'ps' in tf_config['cluster']:
cur_job_name = tf_config['task']['type']
cur_task_index = tf_config['task']['index']
cluster = tf.train.ClusterSpec(tf_config['cluster'])
server = server_lib.Server(
cluster, job_name=cur_job_name, task_index=cur_task_index)
server_target = server.target
print('server_target = %s' % server_target)
elif tf_config['task']['type'] == 'worker':
if 'ps' in tf_config['cluster']:
cur_job_name = tf_config['task']['type']
cur_task_index = tf_config['task']['index']
cluster = tf.train.ClusterSpec(tf_config['cluster'])
server = server_lib.Server(
cluster, job_name=cur_job_name, task_index=cur_task_index)
server_target = server.target
print('server_target = %s' % server_target)
if server_target:
from tensorflow.python.training.device_setter import replica_device_setter
from tensorflow.python.framework.ops import device
from tensorflow.python.training.monitored_session import MonitoredSession
from tensorflow.python.training.monitored_session import ChiefSessionCreator
from tensorflow.python.training.monitored_session import WorkerSessionCreator
from easy_rec.python.utils.estimator_utils import EvaluateExitBarrierHook
cur_work_device = '/job:' + cur_job_name + '/task:' + str(cur_task_index)
cur_ps_num = len(tf_config['cluster']['ps'])
with device(
replica_device_setter(
ps_tasks=cur_ps_num, worker_device=cur_work_device,
cluster=cluster)):
distribution = strategy_builder.build(train_config)
estimator, run_config = _create_estimator(pipeline_config, distribution)
eval_spec = _create_eval_export_spec(pipeline_config, eval_data)
ckpt_path = _get_ckpt_path(pipeline_config, eval_checkpoint_path)
ckpt_dir = os.path.dirname(ckpt_path)
input_iter = eval_spec.input_fn(
mode=tf.estimator.ModeKeys.EVAL).make_one_shot_iterator()
input_feas, input_lbls = input_iter.get_next()
estimator_spec = estimator._distribute_eval_model_fn(
input_feas, input_lbls, run_config)
session_config = ConfigProto(
allow_soft_placement=True,
log_device_placement=True,
device_filters=['/job:ps',
'/job:worker/task:%d' % cur_task_index])
if cur_job_name == 'master':
metric_variables = tf.get_collection(tf.GraphKeys.METRIC_VARIABLES)
model_ready_for_local_init_op = tf.variables_initializer(metric_variables)
global_variables = tf.global_variables()
remain_variables = list(
set(global_variables).difference(set(metric_variables)))
cur_saver = tf.train.Saver(var_list=remain_variables, sharded=True)
cur_scaffold = tf.train.Scaffold(
saver=cur_saver,
ready_for_local_init_op=model_ready_for_local_init_op)
cur_sess_creator = ChiefSessionCreator(
scaffold=cur_scaffold,
master=server_target,
checkpoint_filename_with_path=ckpt_path,
config=session_config)
else:
cur_sess_creator = WorkerSessionCreator(
master=server_target, config=session_config)
eval_metric_ops = estimator_spec.eval_metric_ops
update_ops = [eval_metric_ops[x][1] for x in eval_metric_ops.keys()]
metric_ops = {x: eval_metric_ops[x][0] for x in eval_metric_ops.keys()}
update_op = tf.group(update_ops)
cur_worker_num = len(tf_config['cluster']['worker']) + 1
if cur_job_name == 'master':
cur_stop_grace_period_sesc = 120
cur_hooks = EvaluateExitBarrierHook(cur_worker_num, True, ckpt_dir,
metric_ops)
else:
cur_stop_grace_period_sesc = 10
cur_hooks = EvaluateExitBarrierHook(cur_worker_num, False, ckpt_dir,
metric_ops)
with MonitoredSession(
session_creator=cur_sess_creator,
hooks=[cur_hooks],
stop_grace_period_secs=cur_stop_grace_period_sesc) as sess:
while True:
try:
sess.run(update_op)
except tf.errors.OutOfRangeError:
break
eval_result = cur_hooks.eval_result
logging.info('Evaluate finish')
# write eval result to file
model_dir = pipeline_config.model_dir
eval_result_file = os.path.join(model_dir, eval_result_filename)
logging.info('save eval result to file %s' % eval_result_file)
if cur_job_name == 'master':
print('eval_result = ', eval_result)
logging.info('eval_result = {0}'.format(eval_result))
with gfile.GFile(eval_result_file, 'w') as ofile:
result_to_write = {'eval_method': 'distribute'}
for key in sorted(eval_result):
# skip logging binary data
if isinstance(eval_result[key], six.binary_type):
continue
# convert numpy float to python float
result_to_write[key] = eval_result[key].item()
ofile.write(json.dumps(result_to_write))
return eval_result