def distribute_evaluate()

in easy_rec/python/main.py [0:0]


def distribute_evaluate(pipeline_config,
                        eval_checkpoint_path='',
                        eval_data_path=None,
                        eval_result_filename='distribute_eval_result.txt'):
  """Evaluate a EasyRec model defined in pipeline_config_path.

  Evaluate the model defined in pipeline_config_path on the eval data,
  the metrics will be displayed on tensorboard and saved into eval_result.txt.

  Args:
    pipeline_config: either EasyRecConfig path or its instance
    eval_checkpoint_path: if specified, will use this model instead of
        model specified by model_dir in pipeline_config_path
    eval_data_path: eval data path, default use eval data in pipeline_config
        could be a path or a list of paths
    eval_result_filename: evaluation result metrics save path.

  Returns:
    A dict of evaluation metrics: the metrics are specified in
        pipeline_config_path
    global_step: the global step for which this evaluation was performed.

  Raises:
    AssertionError, if:
      * pipeline_config_path does not exist
  """
  pipeline_config = config_util.get_configs_from_pipeline_file(pipeline_config)
  if eval_data_path is not None:
    logging.info('Evaluating on data: %s' % eval_data_path)
    set_eval_input_path(pipeline_config, eval_data_path)
  train_config = pipeline_config.train_config
  eval_data = get_eval_input_path(pipeline_config)
  data_config = pipeline_config.data_config
  if data_config.HasField('sampler'):
    logging.warning(
        'It is not accuracy to use eval with negative sampler, recommand to use hitrate.py!'
    )
    eval_result = {}
    return eval_result
  model_dir = get_model_dir_path(pipeline_config)
  eval_tmp_results_dir = os.path.join(model_dir, 'distribute_eval_tmp_results')
  if not gfile.IsDirectory(eval_tmp_results_dir):
    logging.info('create eval tmp results dir {}'.format(eval_tmp_results_dir))
    gfile.MakeDirs(eval_tmp_results_dir)
  assert gfile.IsDirectory(
      eval_tmp_results_dir), 'tmp results dir not create success.'
  os.environ['eval_tmp_results_dir'] = eval_tmp_results_dir

  server_target = None
  cur_job_name = None
  if 'TF_CONFIG' in os.environ:
    tf_config = estimator_utils.chief_to_master()

    from tensorflow.python.training import server_lib
    if tf_config['task']['type'] == 'ps':
      cluster = tf.train.ClusterSpec(tf_config['cluster'])
      server = server_lib.Server(
          cluster, job_name='ps', task_index=tf_config['task']['index'])
      server.join()
    elif tf_config['task']['type'] == 'master':
      if 'ps' in tf_config['cluster']:
        cur_job_name = tf_config['task']['type']
        cur_task_index = tf_config['task']['index']
        cluster = tf.train.ClusterSpec(tf_config['cluster'])
        server = server_lib.Server(
            cluster, job_name=cur_job_name, task_index=cur_task_index)
        server_target = server.target
        print('server_target = %s' % server_target)
    elif tf_config['task']['type'] == 'worker':
      if 'ps' in tf_config['cluster']:
        cur_job_name = tf_config['task']['type']
        cur_task_index = tf_config['task']['index']
        cluster = tf.train.ClusterSpec(tf_config['cluster'])
        server = server_lib.Server(
            cluster, job_name=cur_job_name, task_index=cur_task_index)
        server_target = server.target
        print('server_target = %s' % server_target)

  if server_target:
    from tensorflow.python.training.device_setter import replica_device_setter
    from tensorflow.python.framework.ops import device
    from tensorflow.python.training.monitored_session import MonitoredSession
    from tensorflow.python.training.monitored_session import ChiefSessionCreator
    from tensorflow.python.training.monitored_session import WorkerSessionCreator
    from easy_rec.python.utils.estimator_utils import EvaluateExitBarrierHook
    cur_work_device = '/job:' + cur_job_name + '/task:' + str(cur_task_index)
    cur_ps_num = len(tf_config['cluster']['ps'])
    with device(
        replica_device_setter(
            ps_tasks=cur_ps_num, worker_device=cur_work_device,
            cluster=cluster)):
      distribution = strategy_builder.build(train_config)
      estimator, run_config = _create_estimator(pipeline_config, distribution)
      eval_spec = _create_eval_export_spec(pipeline_config, eval_data)
      ckpt_path = _get_ckpt_path(pipeline_config, eval_checkpoint_path)
      ckpt_dir = os.path.dirname(ckpt_path)
      input_iter = eval_spec.input_fn(
          mode=tf.estimator.ModeKeys.EVAL).make_one_shot_iterator()
      input_feas, input_lbls = input_iter.get_next()
      estimator_spec = estimator._distribute_eval_model_fn(
          input_feas, input_lbls, run_config)

    session_config = ConfigProto(
        allow_soft_placement=True,
        log_device_placement=True,
        device_filters=['/job:ps',
                        '/job:worker/task:%d' % cur_task_index])
    if cur_job_name == 'master':
      metric_variables = tf.get_collection(tf.GraphKeys.METRIC_VARIABLES)
      model_ready_for_local_init_op = tf.variables_initializer(metric_variables)
      global_variables = tf.global_variables()
      remain_variables = list(
          set(global_variables).difference(set(metric_variables)))
      cur_saver = tf.train.Saver(var_list=remain_variables, sharded=True)
      cur_scaffold = tf.train.Scaffold(
          saver=cur_saver,
          ready_for_local_init_op=model_ready_for_local_init_op)
      cur_sess_creator = ChiefSessionCreator(
          scaffold=cur_scaffold,
          master=server_target,
          checkpoint_filename_with_path=ckpt_path,
          config=session_config)
    else:
      cur_sess_creator = WorkerSessionCreator(
          master=server_target, config=session_config)
    eval_metric_ops = estimator_spec.eval_metric_ops
    update_ops = [eval_metric_ops[x][1] for x in eval_metric_ops.keys()]
    metric_ops = {x: eval_metric_ops[x][0] for x in eval_metric_ops.keys()}
    update_op = tf.group(update_ops)
    cur_worker_num = len(tf_config['cluster']['worker']) + 1
    if cur_job_name == 'master':
      cur_stop_grace_period_sesc = 120
      cur_hooks = EvaluateExitBarrierHook(cur_worker_num, True, ckpt_dir,
                                          metric_ops)
    else:
      cur_stop_grace_period_sesc = 10
      cur_hooks = EvaluateExitBarrierHook(cur_worker_num, False, ckpt_dir,
                                          metric_ops)
    with MonitoredSession(
        session_creator=cur_sess_creator,
        hooks=[cur_hooks],
        stop_grace_period_secs=cur_stop_grace_period_sesc) as sess:
      while True:
        try:
          sess.run(update_op)
        except tf.errors.OutOfRangeError:
          break
    eval_result = cur_hooks.eval_result

  logging.info('Evaluate finish')

  # write eval result to file
  model_dir = pipeline_config.model_dir
  eval_result_file = os.path.join(model_dir, eval_result_filename)
  logging.info('save eval result to file %s' % eval_result_file)
  if cur_job_name == 'master':
    print('eval_result = ', eval_result)
    logging.info('eval_result = {0}'.format(eval_result))
    with gfile.GFile(eval_result_file, 'w') as ofile:
      result_to_write = {'eval_method': 'distribute'}
      for key in sorted(eval_result):
        # skip logging binary data
        if isinstance(eval_result[key], six.binary_type):
          continue
        # convert numpy float to python float
        result_to_write[key] = eval_result[key].item()

      ofile.write(json.dumps(result_to_write))
  return eval_result