mlebench/competitions/utils.py (167 lines of code) (raw):

from pathlib import Path import numpy as np import pandas as pd from numpy import ndarray from scipy.special import softmax from mlebench.grade_helpers import InvalidSubmissionError from mlebench.utils import get_logger logger = get_logger(__name__) def df_to_one_hot( df: pd.DataFrame, id_column: str, target_column: str, classes: list[str] ) -> pd.DataFrame: """ Convert class labels to one-hot encoded vectors. """ y_onehot = pd.DataFrame(0, index=df.index, columns=[id_column] + classes) y_onehot[id_column] = df[id_column] for i, row in df.iterrows(): y_onehot.loc[i, row[target_column]] = 1 return y_onehot def one_hot_dfs_to_log_loss_inputs( submission_one_hot: pd.DataFrame, answers_one_hot: pd.DataFrame, id_column: str = "id", apply_softmax: bool = True, ) -> dict: """ Frequently used logic to prepare one-hotted dfs for log loss calculation. """ required_cols = set(answers_one_hot.columns) submission_cols = set(submission_one_hot.columns) if not submission_cols.issuperset(required_cols): raise InvalidSubmissionError( f"The submission DataFrame is missing some columns required by the `answers` DataFrame. " f"Missing columns: {required_cols - submission_cols}." ) if id_column not in submission_one_hot.columns: raise InvalidSubmissionError(f"Submission is missing id column '{id_column}'.") assert id_column in answers_one_hot.columns, f"Answers is missing id column '{id_column}'." # Filter submission to only include columns that are in the answers submission_filtered = submission_one_hot[ [col for col in answers_one_hot.columns if col in submission_cols] ] # Sort submission and answers by id to align them submission_sorted = submission_filtered.sort_values(by=id_column).reset_index(drop=True) answers_sorted = answers_one_hot.sort_values(by=id_column).reset_index(drop=True) assert submission_sorted[id_column].tolist() == answers_sorted[id_column].tolist(), ( f"Mismatch in {id_column.capitalize()}s between `submission` and `answers` after sorting. " f"Number of mismatched {id_column.capitalize()}s: {len(set(submission_sorted[id_column]) ^ set(answers_sorted[id_column]))}. " f"Ensure both DataFrames have the same {id_column.capitalize()}s." ) assert list(submission_sorted.columns) == list(answers_sorted.columns), ( "Column order mismatch after filtering and sorting. " "Ensure both DataFrames have columns in the same order." ) y_true = answers_sorted.drop(columns=[id_column]).to_numpy() y_pred = submission_sorted.drop(columns=[id_column]).to_numpy() if apply_softmax and is_one_hot_encoded(y_pred): logger.warning( "The flag `apply_softmax` has been set to `True` but the submission is already " "one-hot encoded. Skipping softmax." ) if apply_softmax and not is_one_hot_encoded(y_pred): y_pred = softmax(y_pred, axis=-1) log_loss_inputs = { "y_true": y_true, "y_pred": y_pred, } return log_loss_inputs def is_one_hot_encoded(xs: ndarray) -> bool: """Check if a 2D NumPy array is one-hot encoded.""" assert isinstance(xs, ndarray), f"Expected a NumPy array, got {type(xs)}." assert xs.ndim == 2, f"Expected a 2D array, got {xs.ndim}D." is_binary_matrix = np.bitwise_or(xs == 0, xs == 1).all() is_normalized = np.allclose(xs.sum(axis=-1), 1) is_one_hot = bool(is_binary_matrix and is_normalized) assert isinstance(is_one_hot, bool), f"Expected a boolean, got {type(is_one_hot)}." return is_one_hot def rle_decode(rle_string: str, height: int, width: int) -> ndarray: """ Decode an RLE string into a binary mask. The RLE encoding is top-down, left-right. So 1 is (1,1), 2 is (2, 1), etc. The RLE is 1-indexed. Checks that the pairs are sorted, positive, and the decoded pixel values do not overlap. Args: rle_string (str): The RLE string. height (int): The height of the image. width (int): The width of the image. Returns: np.array: The decoded binary mask. """ assert isinstance( rle_string, str ), f"Expected a string, but got {type(rle_string)}: {rle_string}" assert isinstance(height, int), f"Expected an integer, but got {type(height)}: {height}" assert isinstance(width, int), f"Expected an integer, but got {type(width)}: {width}" if not rle_string.strip(): # Check if the string is empty or contains only whitespace return np.zeros((height, width), dtype=bool) s = list(map(int, rle_string.split())) starts, lengths = s[0::2], s[1::2] assert starts == sorted(starts), "The pairs in the RLE string must be sorted." assert all(x > 0 for x in starts), "All pairs in the RLE string must be positive integers." assert all(x > 0 for x in lengths), "All pairs in the RLE string must be positive integers." # Convert to 0-based indices starts = np.array(starts) - 1 ends = starts + lengths img = np.zeros(height * width, dtype=bool) for lo, hi in zip(starts, ends): assert not img[lo:hi].any(), "Overlapping RLE pairs are not allowed." img[lo:hi] = True # reshape appropriately given how the RLE was encoded return img.reshape((width, height)).T # https://www.kaggle.com/code/inversion/contrails-rle-submission def rle_encode(x: ndarray, fg_val=1): """ Args: x: numpy array of shape (height, width), 1 - mask, 0 - background Returns: run length encoding as list """ dots = np.where(x.T.flatten() == fg_val)[0] # .T sets Fortran order down-then-right run_lengths = [] prev = -2 for b in dots: if b > prev + 1: run_lengths.extend((b + 1, 0)) run_lengths[-1] += 1 prev = b return run_lengths def rles_to_masks( rl_encodings: list[str], image_heights: list[int], image_widths: list[int] ) -> list[np.ndarray]: """ Performs run-length decoding on a list of run-length encodings to get the binary masks """ masks = [ rle_decode(encoding, height=image_height, width=image_width) for encoding, image_height, image_width in zip(rl_encodings, image_heights, image_widths) ] return masks def get_ids_from_tf_records(tf_record_path: Path, id_feature: str = "image_name") -> list[str]: import tensorflow as tf # Import only if needed, otherwise it slows down the module import tf_record_dataset = tf.data.TFRecordDataset(tf_record_path.as_posix()) ids = [] for record in tf_record_dataset: features = tf.train.Example.FromString(record.numpy()) id = features.features.feature[id_feature].bytes_list.value[0].decode("utf-8") ids.append(id) return ids def prepare_for_accuracy_metric( submission: pd.DataFrame, answers: pd.DataFrame, target_column: str, id_column: str, ) -> dict: # Answers checks assert target_column in answers.columns, f"Answers must have a `{target_column}` column" assert id_column in answers.columns, f"Answers must have a `{id_column}` column" # Submission checks if len(submission) != len(answers): raise InvalidSubmissionError("Submission must have the same length as the answers.") if target_column not in submission.columns: raise InvalidSubmissionError(f"Submission must have a `{target_column}` column") if id_column not in submission.columns: raise InvalidSubmissionError(f"Submission must have a `{id_column}` column") # sort on id to ensure correct order submission = submission.sort_values(id_column) answers = answers.sort_values(id_column) if (submission[id_column].values != answers[id_column].values).any(): raise InvalidSubmissionError(f"Submission and Answers `{id_column}`'s do not match") y_pred = submission[target_column].to_numpy() y_true = answers[target_column].to_numpy() return {"y_true": y_true, "y_pred": y_pred} def prepare_for_auroc_metric( submission: pd.DataFrame, answers: pd.DataFrame, id_col: str, target_col: str ) -> dict: # Answers checks assert id_col in answers.columns, f"answers dataframe should have an {id_col} column" assert target_col in answers.columns, f"answers dataframe should have a {target_col} column" # Submission checks if id_col not in submission.columns: raise InvalidSubmissionError(f"Submission should have an {id_col} column") if target_col not in submission.columns: raise InvalidSubmissionError(f"Submission should have a {target_col} column") if len(submission) != len(answers): raise InvalidSubmissionError(f"Submission and answers should have the same number of rows") try: pd.to_numeric(submission[target_col]) except ValueError: raise InvalidSubmissionError( f"Expected {target_col} column to be numeric, got {submission[target_col].dtype} instead" ) if submission[target_col].min() < 0 or submission[target_col].max() > 1: raise InvalidSubmissionError( f"Submission {target_col} column should contain probabilities," " and therefore contain values between 0 and 1 inclusive" ) # Sort submission = submission.sort_values(id_col) answers = answers.sort_values(id_col) if (submission[id_col].values != answers[id_col].values).any(): raise InvalidSubmissionError(f"Submission and answers should have the same {id_col} values") roc_auc_inputs = { "y_true": answers[target_col].to_numpy(), "y_score": submission[target_col].to_numpy(), } return roc_auc_inputs