in easy_rec/python/core/metrics.py [0:0]
def _distribute_separated_auc_impl(labels,
predictions,
keys,
reduction='mean',
metric_name='sepatated_auc'):
"""Computes the AUC group by the key separately.
Args:
labels: A `Tensor` whose shape matches `predictions`. Will be cast to
`bool`.
predictions: A floating point `Tensor` of arbitrary shape and whose values
are in the range `[0, 1]`.
keys: keys to be group by, A int or string `Tensor` whose shape matches `predictions`.
reduction: reduction metric for auc of different keys
metric_name: the name of compute metric
* "mean": simple mean of different keys
* "mean_by_sample_num": weighted mean with sample num of different keys
* "mean_by_positive_num": weighted mean with positive sample num of different keys
"""
assert reduction in ['mean', 'mean_by_sample_num', 'mean_by_positive_num'], \
'reduction method must in mean | mean_by_sample_num | mean_by_positive_num'
separated_label = defaultdict(list)
separated_prediction = defaultdict(list)
separated_weights = defaultdict(int)
tf_config = json.loads(os.environ['TF_CONFIG'])
cur_job_name = tf_config['task']['type']
cur_task_index, task_num = get_task_index_and_num()
cur_work_device = 'job_' + cur_job_name + '__' + 'task_' + str(cur_task_index)
eval_tmp_results_dir = os.environ['eval_tmp_results_dir']
assert tf.gfile.IsDirectory(
eval_tmp_results_dir), 'eval_tmp_results_dir not exists'
def update_pyfunc(labels, predictions, keys):
for label, prediction, key in zip(labels, predictions, keys):
key = str(key)
separated_label[key].append(label.item())
separated_prediction[key].append(prediction.item())
if reduction == 'mean':
separated_weights[key] = 1
elif reduction == 'mean_by_sample_num':
separated_weights[key] += 1
elif reduction == 'mean_by_positive_num':
separated_weights[key] += label.item()
for name, data in zip(
['separated_label', 'separated_prediction', 'separated_weights'],
[separated_label, separated_prediction, separated_weights]):
cur_json_name = metric_name + '__' + cur_work_device + '__' + name + '.json'
cur_json_path = os.path.join(eval_tmp_results_dir, cur_json_name)
save_data_to_json_path(cur_json_path, data)
def value_pyfunc():
for task_i in range(1, task_num):
work_device_i = 'job_worker__task_' + str(task_i)
for name in [
'separated_label', 'separated_prediction', 'separated_weights'
]:
json_name_i = metric_name + '__' + work_device_i + '__' + name + '.json'
json_path_i = os.path.join(eval_tmp_results_dir, json_name_i)
data_i = read_data_from_json_path(json_path_i)
if (name == 'separated_label'):
separated_label.update({
key: separated_label.get(key, []) + data_i.get(key, [])
for key in set(
list(separated_label.keys()) + list(data_i.keys()))
})
elif (name == 'separated_prediction'):
separated_prediction.update({
key: separated_prediction.get(key, []) + data_i.get(key, [])
for key in set(
list(separated_prediction.keys()) + list(data_i.keys()))
})
elif (name == 'separated_weights'):
if reduction == 'mean':
separated_weights.update(data_i)
else:
separated_weights.update({
key: separated_weights.get(key, 0) + data_i.get(key, 0)
for key in set(
list(separated_weights.keys()) + list(data_i.keys()))
})
else:
assert False, 'Not supported name {}'.format(name)
metrics = []
weights = []
for key in separated_label.keys():
per_label = np.asarray(separated_label[key]).reshape([-1])
per_prediction = np.asarray(separated_prediction[key]).reshape([-1])
if np.all(per_label == 1) or np.all(per_label == 0):
continue
metric = sklearn_metrics.roc_auc_score(per_label, per_prediction)
metrics.append(metric)
weights.append(separated_weights[key])
if len(metrics) > 0:
return np.average(metrics, weights=weights).astype(np.float32)
else:
return np.float32(0.0)
update_op = tf.py_func(update_pyfunc, [labels, predictions, keys], [])
value_op = tf.py_func(value_pyfunc, [], tf.float32)
return value_op, update_op