def _distribute_separated_auc_impl()

in easy_rec/python/core/metrics.py [0:0]


def _distribute_separated_auc_impl(labels,
                                   predictions,
                                   keys,
                                   reduction='mean',
                                   metric_name='sepatated_auc'):
  """Computes the AUC group by the key separately.

  Args:
    labels: A `Tensor` whose shape matches `predictions`. Will be cast to
      `bool`.
    predictions: A floating point `Tensor` of arbitrary shape and whose values
      are in the range `[0, 1]`.
    keys: keys to be group by, A int or string `Tensor` whose shape matches `predictions`.
    reduction: reduction metric for auc of different keys
    metric_name: the name of compute metric
      * "mean": simple mean of different keys
      * "mean_by_sample_num": weighted mean with sample num of different keys
      * "mean_by_positive_num": weighted mean with positive sample num of different keys
  """
  assert reduction in ['mean', 'mean_by_sample_num', 'mean_by_positive_num'], \
      'reduction method must in mean | mean_by_sample_num | mean_by_positive_num'
  separated_label = defaultdict(list)
  separated_prediction = defaultdict(list)
  separated_weights = defaultdict(int)
  tf_config = json.loads(os.environ['TF_CONFIG'])
  cur_job_name = tf_config['task']['type']
  cur_task_index, task_num = get_task_index_and_num()
  cur_work_device = 'job_' + cur_job_name + '__' + 'task_' + str(cur_task_index)
  eval_tmp_results_dir = os.environ['eval_tmp_results_dir']
  assert tf.gfile.IsDirectory(
      eval_tmp_results_dir), 'eval_tmp_results_dir not exists'

  def update_pyfunc(labels, predictions, keys):
    for label, prediction, key in zip(labels, predictions, keys):
      key = str(key)
      separated_label[key].append(label.item())
      separated_prediction[key].append(prediction.item())
      if reduction == 'mean':
        separated_weights[key] = 1
      elif reduction == 'mean_by_sample_num':
        separated_weights[key] += 1
      elif reduction == 'mean_by_positive_num':
        separated_weights[key] += label.item()
    for name, data in zip(
        ['separated_label', 'separated_prediction', 'separated_weights'],
        [separated_label, separated_prediction, separated_weights]):
      cur_json_name = metric_name + '__' + cur_work_device + '__' + name + '.json'
      cur_json_path = os.path.join(eval_tmp_results_dir, cur_json_name)
      save_data_to_json_path(cur_json_path, data)

  def value_pyfunc():
    for task_i in range(1, task_num):
      work_device_i = 'job_worker__task_' + str(task_i)
      for name in [
          'separated_label', 'separated_prediction', 'separated_weights'
      ]:
        json_name_i = metric_name + '__' + work_device_i + '__' + name + '.json'
        json_path_i = os.path.join(eval_tmp_results_dir, json_name_i)
        data_i = read_data_from_json_path(json_path_i)
        if (name == 'separated_label'):
          separated_label.update({
              key: separated_label.get(key, []) + data_i.get(key, [])
              for key in set(
                  list(separated_label.keys()) + list(data_i.keys()))
          })
        elif (name == 'separated_prediction'):
          separated_prediction.update({
              key: separated_prediction.get(key, []) + data_i.get(key, [])
              for key in set(
                  list(separated_prediction.keys()) + list(data_i.keys()))
          })
        elif (name == 'separated_weights'):
          if reduction == 'mean':
            separated_weights.update(data_i)
          else:
            separated_weights.update({
                key: separated_weights.get(key, 0) + data_i.get(key, 0)
                for key in set(
                    list(separated_weights.keys()) + list(data_i.keys()))
            })
        else:
          assert False, 'Not supported name {}'.format(name)
    metrics = []
    weights = []
    for key in separated_label.keys():
      per_label = np.asarray(separated_label[key]).reshape([-1])
      per_prediction = np.asarray(separated_prediction[key]).reshape([-1])
      if np.all(per_label == 1) or np.all(per_label == 0):
        continue
      metric = sklearn_metrics.roc_auc_score(per_label, per_prediction)
      metrics.append(metric)
      weights.append(separated_weights[key])
    if len(metrics) > 0:
      return np.average(metrics, weights=weights).astype(np.float32)
    else:
      return np.float32(0.0)

  update_op = tf.py_func(update_pyfunc, [labels, predictions, keys], [])
  value_op = tf.py_func(value_pyfunc, [], tf.float32)
  return value_op, update_op