mlebench/competitions/uw-madison-gi-tract-image-segmentation/grade.py (94 lines of code) (raw):
import numpy as np
from pandas import DataFrame
from scipy.spatial.distance import directed_hausdorff
from tqdm.auto import tqdm
from mlebench.competitions.utils import rles_to_masks
from mlebench.grade_helpers import InvalidSubmissionError
from mlebench.metrics import dice_coefficient
UNIT_CUBE_DIAGONAL = np.sqrt(3)
def group_masks_by_day(masks: list[np.ndarray], cases_days_slices: list[str]) -> list[np.ndarray]:
"""
Groups the 2D masks into 3D masks based on the case days
- We first join masks of the same slice into a single mask with an OR operation
- We then stack them with all the other slices from the same day in the same case into a 3D image
"""
case_day_slice_to_masks = {}
unique_slices = [] # list to keep ordering consistent
for mask, case_day_slice in sorted(zip(masks, cases_days_slices), key=lambda x: x[1]):
if case_day_slice not in case_day_slice_to_masks:
case_day_slice_to_masks[case_day_slice] = []
unique_slices.append(case_day_slice)
case_day_slice_to_masks[case_day_slice].append(mask)
case_day_to_masks = {}
case_days = [] # list to keep ordering consistent
for case_day_slice in unique_slices:
# go from e.g. "case123_day2_slice_0001" to "case123_day2"
case_day = case_day_slice.split("_slice_")[0]
if case_day not in case_day_to_masks:
case_day_to_masks[case_day] = []
case_days.append(case_day)
# OR on the masks from the same case_day_slice
joined_mask = np.logical_or.reduce(case_day_slice_to_masks[case_day_slice]).astype(np.uint8)
# add the joined slice to its relevant case_day
case_day_to_masks[case_day].append(joined_mask)
# now we stack the joined masks for each case_day to make a 3d image
for case_day in case_day_to_masks:
# make into 3d image
case_day_to_masks[case_day] = np.stack(case_day_to_masks[case_day], axis=0)
# keep the order of the cases consistent
case_day_masks = [case_day_to_masks[case_day] for case_day in case_days]
return case_day_masks
def prepare_for_metric(submission: DataFrame, answers: DataFrame):
"""
Answers dataframe contains image_height and image_width columns necessary for RLE decoding
"""
assert "image_height" in answers.columns, "image_height column not found in answers"
assert "image_width" in answers.columns, "image_width column not found in answers"
if not {"id", "class", "predicted"}.issubset(set(submission.columns)):
raise InvalidSubmissionError("Submission must have columns: id, class, predicted")
# prepare appropriate types
submission["predicted"] = submission["predicted"].fillna("")
answers["predicted"] = answers["predicted"].fillna("")
answers["image_height"] = answers["image_height"].astype(int)
answers["image_width"] = answers["image_width"].astype(int)
image_heights = answers["image_height"].tolist()
image_widths = answers["image_width"].tolist()
# extract masked images from RLE in the dataframes
submission_masks = rles_to_masks(submission["predicted"].to_list(), image_heights, image_widths)
answer_masks = rles_to_masks(answers["predicted"].to_list(), image_heights, image_widths)
# group 2d slice masks into 3d case_day masks
submission_masks_by_day = group_masks_by_day(submission_masks, submission["id"])
answer_masks_by_day = group_masks_by_day(answer_masks, answers["id"])
return submission_masks, answer_masks, submission_masks_by_day, answer_masks_by_day
def hausdorff_distance(predicted_mask: np.ndarray, true_mask: np.ndarray) -> float:
"""
Computes the hausdorff distance between two 3-dimensional binary masks (H, W, C)
Args:
predicted_mask: A 3D binary numpy array indicating where the segmentation is predicted
true_mask: A 3D binary numpy array indicating where the segmentation is
"""
# if both empty, return nan, if identical return 0, if only one is empty, return 1
if np.sum(predicted_mask) == 0 and np.sum(true_mask) == 0:
return np.nan
elif np.array_equal(predicted_mask, true_mask):
return 0
elif (np.sum(predicted_mask) == 0 and np.sum(true_mask) > 0) or (
np.sum(predicted_mask) > 0 and np.sum(true_mask) == 0
):
return 1
# if the pred mask is > 1 OOM larger than the true mask, return 1 to avoid unnecess heavy computation
if predicted_mask.sum() > 10 * true_mask.sum():
return 1
true_coordinates = np.argwhere(true_mask)
predicted_coordinates = np.argwhere(predicted_mask)
# normalize so that the coordinates are in the range [0, 1], to get a unit cube
normalization_constants = true_mask.shape
true_coordinates = true_coordinates / normalization_constants
predicted_coordinates = predicted_coordinates / normalization_constants
# Symmetrize Hausdorff distance
hausdorff_distance1, _, _ = directed_hausdorff(true_coordinates, predicted_coordinates)
hausdorff_distance2, _, _ = directed_hausdorff(predicted_coordinates, true_coordinates)
hausdorff_distance = max(hausdorff_distance1, hausdorff_distance2)
# normalize by the maximum distance possible in a unit cube, which is the diagonal
hausdorff_distance = hausdorff_distance / UNIT_CUBE_DIAGONAL
return hausdorff_distance
def grade(submission: DataFrame, answers: DataFrame) -> float:
"""
Computes dice_weight * dice + hausdorff_weight * (1 - hausdorff)
"""
dice_weight = 0.4
hausdorff_weight = 0.6
(
submission_masks,
answer_masks,
submission_masks_by_day,
answer_masks_by_day,
) = prepare_for_metric(submission, answers)
# we skip cases where both masks are empty, so we use nanmean
# https://www.kaggle.com/competitions/uw-madison-gi-tract-image-segmentation/discussion/324934#1795540
dice_coeff = np.nanmean(
[
dice_coefficient(submission_mask, answer_mask, np.nan)
for submission_mask, answer_mask in tqdm(
zip(submission_masks, answer_masks),
total=len(submission_masks),
)
]
)
# skip cases where both masks are empty, to match behaviour of dice
hausdorff_dis = np.nanmean(
[
hausdorff_distance(submission_mask_by_day, answer_mask_by_day)
for submission_mask_by_day, answer_mask_by_day in tqdm(
zip(submission_masks_by_day, answer_masks_by_day),
total=len(submission_masks_by_day),
)
]
)
return dice_weight * dice_coeff + hausdorff_weight * (1 - hausdorff_dis)