in vision/m4/evaluation/custom_metrics/classification_vqa_metrics.py [0:0]
def _compute(self, predictions, example_ids, true_labels, tested_labels, tol=0.001):
data_per_id = {}
for example_id, prediction, true_label_l, tested_label in zip(
example_ids, predictions, true_labels, tested_labels
):
if example_id not in data_per_id:
data_per_id[example_id] = {
"predictions": [],
"tested_labels": [],
}
# If condition is a dirty trick to handle the case of distributed evaluation where some instances can be
# repeated over a few proceses to make the batches even.
# In this case, we just verify that all processes predicted the same thing, and only take one copy of predictions
# in order to not mess up metrics. Ideally this "unique" logic should be handled outside of the metric or maybe
# in the add_batch call...
if tested_label in data_per_id[example_id]["tested_labels"]:
idx_already_present = data_per_id[example_id]["tested_labels"].index(tested_label)
# It happens in practice that different predictions for the same `example_id` differ by
# a tiny bit, hence the use of a tolerance to validate the `assert`
difference = abs(data_per_id[example_id]["predictions"][idx_already_present] - prediction)
logger.warning(
f"prediction already present: {data_per_id[example_id]['predictions'][idx_already_present]} | new"
f" prediction: {prediction} | difference: {difference}"
)
assert data_per_id[example_id]["tested_labels"][idx_already_present] == tested_label
assert data_per_id[example_id]["true_label_l"] == true_label_l
else:
data_per_id[example_id]["predictions"].append(prediction)
data_per_id[example_id]["true_label_l"] = true_label_l
data_per_id[example_id]["tested_labels"].append(tested_label)
# assert list(range(len(data_per_id))) == sorted(data_per_id.keys())
results = {}
references = []
top1_predictions = []
for example_id in data_per_id.keys():
idx = np.argmax(data_per_id[example_id]["predictions"])
references.append(data_per_id[example_id]["true_label_l"])
top1_predictions.append(data_per_id[example_id]["tested_labels"][idx])
# VQA Accuracy
if ClassifVQAMetrics.VQA_ACCURACY in self.metrics:
vqa_accuracy_scores = []
for prediction, answers_ in zip(top1_predictions, references):
answers_ = [vqa_normalize_text(answer_) for answer_ in answers_]
gt_acc = []
for idx_ref in range(len(answers_)):
other_answers_ = [other_answer for idx, other_answer in enumerate(answers_) if idx != idx_ref]
matched = [other_answer for other_answer in other_answers_ if other_answer == prediction]
acc = min(1, len(matched) / 3)
gt_acc.append(acc)
vqa_accuracy_scores.append(sum(gt_acc) / len(gt_acc))
results["vqa_accuracy"] = float(sum(vqa_accuracy_scores) / len(vqa_accuracy_scores))
# Entropy
if ClassifVQAMetrics.ENTROPY_DISTRIBUTION in self.metrics or ClassifVQAMetrics.ENTROPY_MEAN in self.metrics:
entropy_scores = []
for example_id in data_per_id.keys():
q = softmax(np.array(data_per_id[example_id]["predictions"]))
# Source https://en.wikipedia.org/wiki/Entropy_(information_theory)
# Given a discrete random variable X, which takes values in the alphabet M and is distributed according
# to p : X → [ 0 , 1 ]
# H(X):=-\sum_{x \in M} p(x) \log p(x)
entropy = -np.sum(np.log(q) * q)
entropy_scores.append(entropy)
if ClassifVQAMetrics.ENTROPY_DISTRIBUTION in self.metrics:
results["entropy_distribution"] = entropy_scores
if ClassifVQAMetrics.ENTROPY_MEAN in self.metrics:
results["entropy_mean"] = float(np.mean(entropy_scores))
return results