mlebench/competitions/tgs-salt-identification-challenge/grade.py (93 lines of code) (raw):
import numpy as np
from numpy import ndarray
from pandas import DataFrame
from mlebench.competitions.utils import rle_decode
from mlebench.grade_helpers import InvalidSubmissionError
IMG_HEIGHT = 101
IMG_WIDTH = 101
def grade(submission: DataFrame, answers: DataFrame) -> float:
expected_columns = ["id", "rle_mask"]
assert set(answers.columns) == set(
expected_columns
), f"Expected answers to have columns {expected_columns}, but got {answers.columns} instead!"
if set(submission.columns) != set(expected_columns):
raise InvalidSubmissionError(
f"Expected submission to have columns {expected_columns}, but got {submission.columns} instead!"
)
if len(submission) != len(answers):
raise InvalidSubmissionError(
f"Expected submission to have {len(answers)} rows, but got {len(submission)} instead!"
)
transformed_submission = submission.fillna("")
transformed_answers = answers.fillna("")
assert transformed_answers.applymap(
lambda x: isinstance(x, str)
).values.all(), "Expected all items in `answers` to be strings, but found non-string items!"
if not transformed_submission.applymap(lambda x: isinstance(x, str)).values.all():
raise InvalidSubmissionError(
"Expected all items in `submission` to be strings, but found non-string items!"
)
sorted_submission = transformed_submission.sort_values(by="id")
sorted_answers = transformed_answers.sort_values(by="id")
if (sorted_submission["id"].values != sorted_answers["id"].values).any():
raise InvalidSubmissionError(
f"Expected submission to have the same IDs as answers, but got a different set of IDs!"
)
y_preds_rle = sorted_submission["rle_mask"].values
y_trues_rle = sorted_answers["rle_mask"].values
try:
y_preds = np.array(
[rle_decode(rle, height=IMG_HEIGHT, width=IMG_WIDTH) for rle in y_preds_rle]
)
except AssertionError as e:
raise InvalidSubmissionError(f"Error decoding RLE masks: {e}")
y_trues = np.array([rle_decode(rle, height=IMG_HEIGHT, width=IMG_WIDTH) for rle in y_trues_rle])
scores = [iou_metric(y_trues[i], y_preds[i]) for i in range(len(y_trues))]
score = np.mean(scores)
return score
def iou_metric(y_true_in: ndarray, y_pred_in: ndarray) -> float:
"""
Adapted from https://www.kaggle.com/code/phoenigs/u-net-dropout-augmentation-stratification.
"""
if np.sum(y_true_in) == 0 and np.sum(y_pred_in) == 0:
return 1.0
if np.sum(y_true_in) == 0 and np.sum(y_pred_in) > 0:
return 0.0
labels = y_true_in
y_pred = y_pred_in
true_objects = 2
pred_objects = 2
intersection = np.histogram2d(
labels.flatten(),
y_pred.flatten(),
bins=(true_objects, pred_objects),
)[0]
# Compute areas (needed for finding the union between all objects)
area_true = np.histogram(labels, bins=true_objects)[0]
area_pred = np.histogram(y_pred, bins=pred_objects)[0]
area_true = np.expand_dims(area_true, -1)
area_pred = np.expand_dims(area_pred, 0)
# Compute union
union = area_true + area_pred - intersection
# Exclude background from the analysis
intersection = intersection[1:, 1:]
union = union[1:, 1:]
union[union == 0] = 1e-9
# Compute the intersection over union
iou = intersection / union
# Precision helper function
def precision_at(threshold, iou):
matches = iou > threshold
true_positives = np.sum(matches, axis=1) == 1 # Correct objects
false_positives = np.sum(matches, axis=0) == 0 # Missed objects
false_negatives = np.sum(matches, axis=1) == 0 # Extra objects
tp, fp, fn = np.sum(true_positives), np.sum(false_positives), np.sum(false_negatives)
return tp, fp, fn
# Loop over IoU thresholds
prec = []
for t in np.arange(0.5, 1.0, 0.05):
tp, fp, fn = precision_at(t, iou)
if (tp + fp + fn) > 0:
p = tp / (tp + fp + fn)
else:
p = 0
prec.append(p)
return np.mean(prec)
def iou_metric_batch(y_true_in, y_pred_in):
"""
Adapted from https://www.kaggle.com/code/phoenigs/u-net-dropout-augmentation-stratification.
"""
batch_size = y_true_in.shape[0]
metric = []
for batch in range(batch_size):
value = iou_metric(y_true_in[batch], y_pred_in[batch])
metric.append(value)
return np.mean(metric)