src/smclarify/bias/metrics/common.py (259 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from enum import Enum
from typing import List, Optional, Tuple, Callable, Any, Union
import pandas as pd
import numpy as np
from smclarify.bias.metrics.constants import INFINITY
from smclarify.bias.metrics.constants import UNIQUENESS_THRESHOLD
logger = logging.getLogger(__name__)
class DataType(Enum):
"""
Type of facet data series distribution
"""
CATEGORICAL = 0
CONTINUOUS = 1
def divide(a, b):
if b == 0 and a == 0:
return 0.0
if b == 0:
if a < 0:
return -INFINITY
return INFINITY
return a / b
def require(condition: bool, message: str) -> None:
if not condition:
raise ValueError(message)
def metric_description(metric: Callable[..., float]) -> str:
"""
fetch metric description from doc strings
:param metric: metric callable function
:return: short description of metric
"""
if not metric.__doc__:
logger.exception(f"Description is not found for the registered metric: {metric}")
return metric.__doc__.lstrip().split("\n")[0] # type: ignore
def binary_confusion_matrix(
feature: pd.Series, positive_label_index: pd.Series, positive_predicted_label_index: pd.Series
) -> List[int]:
assert len(feature) == len(positive_label_index) == len(positive_predicted_label_index)
TP, TN, FP, FN = calc_confusion_matrix_quadrants(feature, positive_label_index, positive_predicted_label_index)
n = len(feature)
return [divide(TP, n), divide(FP, n), divide(FN, n), divide(TN, n)]
def calc_confusion_matrix_quadrants(
feature: pd.Series, positive_label_index: pd.Series, positive_predicted_label_index: pd.Series
) -> Tuple[int, int, int, int]:
TP = len(feature[positive_label_index & positive_predicted_label_index])
TN = len(feature[~positive_label_index & (~positive_predicted_label_index)])
FP = len(feature[(~positive_label_index) & positive_predicted_label_index])
FN = len(feature[(positive_label_index) & (~positive_predicted_label_index)])
return TP, TN, FP, FN
def DPL(feature: pd.Series, sensitive_facet_index: pd.Series, positive_label_index: pd.Series) -> float:
require(sensitive_facet_index.dtype == bool, "sensitive_facet_index must be of type bool")
require(positive_label_index.dtype == bool, "label_index must be of type bool")
na = len(feature[~sensitive_facet_index])
nd = len(feature[sensitive_facet_index])
na_pos = len(feature[~sensitive_facet_index & positive_label_index])
nd_pos = len(feature[sensitive_facet_index & positive_label_index])
if na == 0:
raise ValueError("Negative facet set is empty.")
if nd == 0:
raise ValueError("Facet set is empty.")
qa = na_pos / na
qd = nd_pos / nd
dpl = qa - qd
return dpl
def CDD(
feature: pd.Series, sensitive_facet_index: pd.Series, label_index: pd.Series, group_variable: pd.Series
) -> float:
"""
:param feature: input feature
:param sensitive_facet_index: boolean column indicating sensitive group
:param label_index: boolean column indicating positive labels or predicted labels
:param group_variable: categorical column indicating subgroups each point belongs to
:return: the weighted average of demographic disparity on all subgroups
"""
if group_variable is None or group_variable.empty:
raise ValueError("Group variable is empty or not provided")
require(sensitive_facet_index.dtype == bool, "sensitive_facet_index must be of type bool")
require(label_index.dtype == bool, "label_index must be of type bool")
unique_groups = np.unique(group_variable)
# Global demographic disparity (DD)]
denomA = len(feature[label_index])
if denomA == 0:
raise ValueError("No positive labels in set")
denomD = len(feature[~label_index])
if denomD == 0:
raise ValueError("No negative labels in set")
# Conditional demographic disparity (CDD)
# FIXME: appending to numpy arrays is inefficient
CDD: np.typing.NDArray = np.array([])
counts: np.typing.NDArray = np.array([])
for subgroup_variable in unique_groups:
counts = np.append(counts, len(group_variable[group_variable == subgroup_variable]))
numA = len(feature[label_index & sensitive_facet_index & (group_variable == subgroup_variable)])
denomA = len(feature[label_index & (group_variable == subgroup_variable)])
A = numA / denomA if denomA != 0 else 0
numD = len(feature[(~label_index) & sensitive_facet_index & (group_variable == subgroup_variable)])
denomD = len(feature[(~label_index) & (group_variable == subgroup_variable)])
D = numD / denomD if denomD != 0 else 0
CDD = np.append(CDD, D - A)
wtd_mean_CDD = divide(np.sum(counts * CDD), np.sum(counts))
return wtd_mean_CDD
def series_datatype(series: pd.Series, values: Optional[List[Any]] = None) -> DataType:
"""
Determine given data series is categorical or continuous using set of rules.
WARNING: The deduced data type can be different from real data type of the data series. Please
use the function `ensure_series_data_type` instead if you'd like ensure the series data type.
:param series: data for facet/label/predicted_label columns
:param values: list of facet or label values provided by user
:return: Enum {CATEGORICAL|CONTINUOUS}
"""
# if datatype is boolean or categorical we return data as categorical
data_type = DataType.CATEGORICAL
data_uniqueness_fraction = divide(series.nunique(), series.count())
# Assumption: user will give single value for threshold currently
# Todo: fix me if multiple thresholds for facet or label are supported
if series.dtype.name == "category" or (isinstance(values, list) and len(values) > 1):
logger.info(
f"Column {series.name} with data uniqueness fraction {data_uniqueness_fraction} is classifed as a "
f"{data_type.name} column"
)
return data_type
if series.dtype.name in ["str", "string", "object"]:
# cast the dtype to int, if exception is raised data is categorical
casted_data = series.astype("int64", copy=True, errors="ignore")
if np.issubdtype(casted_data.dtype, np.integer) and data_uniqueness_fraction >= UNIQUENESS_THRESHOLD:
data_type = DataType.CONTINUOUS # type: ignore
elif np.issubdtype(series.dtype, np.floating):
data_type = DataType.CONTINUOUS
elif np.issubdtype(series.dtype, np.integer):
# Current rule: If data has more than 5% if unique values then it is continuous
# Todo: Needs to be enhanced, This rule doesn't always determine the datatype correctly
if data_uniqueness_fraction >= UNIQUENESS_THRESHOLD:
data_type = DataType.CONTINUOUS
logger.info(
f"Column {series.name} with data uniqueness fraction {data_uniqueness_fraction} is classifed as a "
f"{data_type.name} column"
)
return data_type
def ensure_series_data_type(series: pd.Series, values: Optional[List[Any]] = None) -> Tuple[DataType, pd.Series]:
"""
Determine the type of the given data series using set of rules, and then do necessary type conversion
to ensure the series data type.
:param series: data for facet/label/predicted_label columns
:param values: list of facet or label values provided by user
:return: A tuple of DataType and the converted data series
"""
data_type = series_datatype(series, values)
if data_type == DataType.CATEGORICAL:
return data_type, series.astype("category")
elif data_type == DataType.CONTINUOUS:
if values:
if not (isinstance(values[0], int) or isinstance(values[0], float)):
try:
values[0] = float(values[0])
except ValueError:
raise ValueError(
"Facet/label value provided must be a single numeric threshold for continuous data"
)
return data_type, pd.to_numeric(series)
raise ValueError("Data series is invalid or can't be classified as neither categorical nor continous.")
def convert_positive_label_values(series: pd.Series, positive_label_values: List[Union[str, int, float]]) -> List:
"""
Determines the type of the given data series and then do necessary type conversion to ensure the positive_lable_values
are of the same type as those in series.
Example problem when it helps:
The problem is that the `label_values_or_threshold` and the actual `label` values are not the same -
i.e. do not have the same type. This leads to customer facing errors when they pass numerical values
to `label_values_or_threshold` (for instance `[1, 2, 3]`) but having string values in the label column
of the dataset (for instance, `['1', '2', '3', '4', '5']`).
:param series: data for facet/label/predicted_label columns
:param positive_label_values: list of label values provided by user
:return: list of label values provided after the conversion (if any)
"""
def _convert(items: List, _type: Callable) -> List:
try:
return [_type(item) for item in items]
except ValueError as e:
# int('1.0') raises a ValueError
if "invalid literal for int() with base 10" in str(e):
return [float(item) for item in items]
raise Exception(
f"'label' has not positive elements. Double-check if 'label' and 'positive_label_values'"
f"have correct data-types or values."
)
if isinstance(positive_label_values[0], type(series[0])):
return positive_label_values
# if the types are different, convert positive_label_values
converted_values: List[Any]
if isinstance(series[0], bool) and isinstance(positive_label_values, str) and positive_label_values[0].isalpha():
# when values = ['True', 'False'] and series = [False, True, ...]
converted_values = [True if label.lower() == "true" else False for label in positive_label_values]
# else when values = [1, 1.0, 0, 0.0] and series = [False, True, ...], _convert(positive_label_values, bool)
# see else below
else:
converted_values = _convert(positive_label_values, type(series[0]))
logger.warning(
f"Data type of the elements in `positive_label_values` and in `label` must match. "
f"Converted positive_label_values from {positive_label_values} to {converted_values}"
)
return converted_values
# Todo: Fix the function to avoid redundant calls for DCA and DCR
def DCO(
feature: pd.Series,
sensitive_facet_index: pd.Series,
positive_label_index: pd.Series,
positive_predicted_label_index: pd.Series,
) -> Tuple[float, float]:
"""
Difference in Conditional Outcomes (DCO)
:param feature: input feature
:param sensitive_facet_index: boolean column indicating sensitive group
:param positive_label_index: boolean column indicating positive labels
:param positive_predicted_label_index: boolean column indicating positive predicted labels
:return: Difference in Conditional Outcomes (Acceptance and Rejection) between advantaged and disadvantaged classes
"""
require(sensitive_facet_index.dtype == bool, "sensitive_facet_index must be of type bool")
require(positive_label_index.dtype == bool, "positive_label_index must be of type bool")
require(positive_predicted_label_index.dtype == bool, "positive_predicted_label_index must be of type bool")
if len(feature[sensitive_facet_index]) == 0:
raise ValueError("DCO: Facet set is empty")
if len(feature[~sensitive_facet_index]) == 0:
raise ValueError("DCO: Negated Facet set is empty")
na0 = len(feature[~positive_label_index & ~sensitive_facet_index])
na0hat = len(feature[~positive_predicted_label_index & ~sensitive_facet_index])
nd0 = len(feature[~positive_label_index & sensitive_facet_index])
nd0hat = len(feature[~positive_predicted_label_index & sensitive_facet_index])
na1 = len(feature[positive_label_index & ~sensitive_facet_index])
na1hat = len(feature[positive_predicted_label_index & ~sensitive_facet_index])
nd1 = len(feature[positive_label_index & sensitive_facet_index])
nd1hat = len(feature[positive_predicted_label_index & sensitive_facet_index])
rr_a = divide(na0, na0hat)
rr_d = divide(nd0, nd0hat)
ca = divide(na1, na1hat)
cd = divide(nd1, nd1hat)
dca = ca - cd
dcr = rr_d - rr_a
if ca == cd and ca == INFINITY:
dca = 0
if rr_a == rr_d and rr_a == INFINITY:
dcr = 0
return dca, dcr
# Todo: Fix the function to avoid redundant calls for DAR and DRR
def DLR(
feature: pd.Series,
sensitive_facet_index: pd.Series,
positive_label_index: pd.Series,
positive_predicted_label_index: pd.Series,
) -> Tuple[float, float]:
"""
Difference in Label Rates (DLR)
For cases where both the nominator and the denominator are 0 we use 0 as result.
:param feature: input feature
:param sensitive_facet_index: boolean column indicating sensitive group
:param positive_label_index: boolean column indicating positive labels
:param positive_predicted_label_index: boolean column indicating positive predicted labels
:return: Difference in Label Rates (aka Difference in Acceptance Rates AND Difference in Rejected Rates)
"""
require(sensitive_facet_index.dtype == bool, "sensitive_facet_index must be of type bool")
require(positive_label_index.dtype == bool, "positive_label_index must be of type bool")
require(positive_predicted_label_index.dtype == bool, "positive_predicted_label_index must be of type bool")
if len(feature[sensitive_facet_index]) == 0:
raise ValueError("DLR: Facet set is empty")
if len(feature[~sensitive_facet_index]) == 0:
raise ValueError("DLR: Negated Facet set is empty")
TP_a = len(feature[positive_label_index & positive_predicted_label_index & (~sensitive_facet_index)])
na1hat = len(feature[positive_predicted_label_index & (~sensitive_facet_index)])
TP_d = len(feature[positive_label_index & positive_predicted_label_index & sensitive_facet_index])
nd1hat = len(feature[positive_predicted_label_index & sensitive_facet_index])
TN_a = len(feature[(~positive_label_index) & (~positive_predicted_label_index) & (~sensitive_facet_index)])
na0hat = len(feature[(~positive_predicted_label_index) & (~sensitive_facet_index)])
TN_d = len(feature[(~positive_label_index) & (~positive_predicted_label_index) & sensitive_facet_index])
nd0hat = len(feature[(~positive_predicted_label_index) & sensitive_facet_index])
ar_a = divide(TP_a, na1hat)
ar_d = divide(TP_d, nd1hat)
rr_a = divide(TN_a, na0hat)
rr_d = divide(TN_d, nd0hat)
dar = ar_a - ar_d
drr = rr_d - rr_a
if ar_a == ar_d and ar_a == INFINITY:
dar = 0
if rr_a == rr_d and rr_a == INFINITY:
drr = 0
return dar, drr
def DLA(
feature: pd.Series,
sensitive_facet_index: pd.Series,
positive_label_index: pd.Series,
positive_predicted_label_index: pd.Series,
) -> Tuple[float, float]:
r"""
Difference in Label Accuracy (DLA)
:param feature: input feature
:param sensitive_facet_index: boolean column indicating sensitive group
:param positive_label_index: boolean column indicating positive labels
:param positive_predicted_label_index: boolean column indicating positive predicted labels
:return: Recall Difference between advantaged and disadvantaged classes
"""
require(sensitive_facet_index.dtype == bool, "sensitive_facet_index must be of type bool")
require(positive_label_index.dtype == bool, "positive_label_index must be of type bool")
require(positive_predicted_label_index.dtype == bool, "positive_predicted_label_index must be of type bool")
if len(feature[sensitive_facet_index]) == 0:
raise ValueError("DLA: Facet set is empty")
if len(feature[~sensitive_facet_index]) == 0:
raise ValueError("DLA: Negated Facet set is empty")
TP_a = len(feature[positive_label_index & positive_predicted_label_index & (~sensitive_facet_index)])
FN_a = len(feature[positive_label_index & (~positive_predicted_label_index) & (~sensitive_facet_index)])
TN_a = len(feature[(~positive_label_index) & (~positive_predicted_label_index) & (~sensitive_facet_index)])
FP_a = len(feature[(~positive_label_index) & positive_predicted_label_index & (~sensitive_facet_index)])
rec_a = divide(TP_a, TP_a + FN_a)
sp_a = divide(TN_a, TN_a + FP_a)
TP_d = len(feature[positive_label_index & positive_predicted_label_index & sensitive_facet_index])
FN_d = len(feature[positive_label_index & (~positive_predicted_label_index) & sensitive_facet_index])
TN_d = len(feature[(~positive_label_index) & (~positive_predicted_label_index) & sensitive_facet_index])
FP_d = len(feature[(~positive_label_index) & positive_predicted_label_index & sensitive_facet_index])
rec_d = divide(TP_d, TP_d + FN_d)
sp_d = divide(TN_d, TN_d + FP_d)
rd = rec_a - rec_d
sd = sp_d - sp_a
if rec_a == rec_d and rec_a == INFINITY:
rd = 0
if sp_a == sp_d and sp_a == INFINITY:
sd = 0
return rd, sd
def GE(positive_label_index: pd.Series, positive_predicted_label_index: pd.Series, alpha: float) -> float:
r"""
Generalized Entropy Index (GE) with parameter alpha.
:param positive_label_index: boolean column indicating positive labels
:param positive_predicted_label_index: boolean column indicating positive predicted labels
:param alpha: parameter of the GE index.
:return: Recall Difference between advantaged and disadvantaged classes
"""
if alpha == 0 or alpha == 1:
raise NotImplementedError("Not implemented for alpha 0 or 1.")
N = positive_label_index.shape[0]
require(positive_label_index.dtype == bool, "positive_label_index must be of type bool")
require(positive_predicted_label_index.dtype == bool, "positive_predicted_label_index must be of type bool")
require(N > 0, "dataset must be non-empty")
positive_predicted_label_index = positive_predicted_label_index.astype(int)
positive_label_index = positive_label_index.astype(int)
benefit = positive_predicted_label_index - positive_label_index + 1
mean_benefit = benefit.mean()
# Benefit is a positive quantity so the mean benefit is 0 only when all
# the benefits are 0, that is, all the predictions are false negatives.
require(
mean_benefit != 0,
"All predicted labels are false negatives. There should be at least one prediction that is not false negative.",
)
benefit_mean_ratio = benefit / mean_benefit
return (benefit_mean_ratio**alpha - 1).sum() / (N * alpha * (alpha - 1))