in tensorflow_data_validation/statistics/generators/sklearn_mutual_information.py [0:0]
def _calculate_mi(self, df: pd.DataFrame, labels: np.ndarray,
discrete_feature_mask: List[bool],
seed: int) -> Dict[types.FeaturePath, Dict[Text, float]]:
"""Calls the sk-learn implementation of MI and stores results in dict.
Args:
df: A pd.DataFrame containing feature values where each column corresponds
to a feature and each row corresponds to an example.
labels: A List where the ith index represents the label for the ith
example.
discrete_feature_mask: A boolean list where the ith element is true iff
the ith feature column in the input df is a categorical feature.
seed: An int value to seed the RNG used in MI computation.
Returns:
Dict[FeatureName, Dict[str,float]] where the keys of the dicts are the
feature name and values are a dict where the keys are
_MUTUAL_INFORMATION_KEY, _ADJUSTED_MUTUAL_INFORMATION_KEY,
_NORMALIZED_ADJUSTED_MUTUAL_INFORMATION_KEY and the values are the MI,
AMI, and normalized AMI for that feature.
"""
result = {}
# Calculate MI for each feature.
mi_per_feature = _sklearn_calculate_mi_wrapper(
df.values,
labels,
discrete_features=discrete_feature_mask,
copy=True,
seed=seed,
is_label_categorical=self._label_feature_is_categorical)
if mi_per_feature is None:
# MI could not be calculated.
return result
# There are multiple ways to normalized AMI. We choose to calculate it as:
# Normalized AMI(X, Y) = AMI(X, Y) / (Max{H(X), H(Y)} - shuffle_mi(X, Y))
# Where H(X) is the entropy of X.
#
# We can derive entropy from MI(X, X) as follows:
# MI(X, X) = H(X) - H(X|X) = H(X)
# Calculate H(feature), for each feature.
entropy_per_feature = []
for col in df.columns:
col_is_categorical = col in self._categorical_features
entropy = _sklearn_calculate_mi_wrapper(
np.array([[x] for x in df[col].values]),
df[col].values,
discrete_features=col_is_categorical,
copy=True,
seed=seed,
is_label_categorical=col_is_categorical)
# The entropy might not exist for a feature. This is because now we are
# treating each feature as a label. The features could be a mix of
# categorical and numerical features, thus MI is calculated on a case by
# case basis, and may not exist in some cases.
# Setting it to 0 will not affect the normalized AMI result, since we are
# looking for max entropy.
entropy_per_feature.append(entropy[0] if entropy else 0)
# Calculate H(label)
if self._label_feature_is_categorical:
# Encode categorical labels as numerical.
_, integerized_label = np.unique(labels, return_inverse=True)
labels_as_feature = np.array([[x] for x in integerized_label])
else:
labels_as_feature = np.array([[x] for x in labels])
label_entropy = _sklearn_calculate_mi_wrapper(
labels_as_feature,
labels,
discrete_features=self._label_feature_is_categorical,
copy=True,
seed=seed,
is_label_categorical=self._label_feature_is_categorical)
# label_entropy is guaranteed to exist. If it does not exist, then
# mi_per_feature would have been None (and we would have exited this).
assert len(label_entropy) == 1
label_entropy = label_entropy[0]
# Shuffle the labels and calculate the MI. This allows us to adjust
# the MI for any memorization in the model.
np.random.shuffle(labels)
shuffled_mi_per_feature = _sklearn_calculate_mi_wrapper(
df.values,
labels,
discrete_features=discrete_feature_mask,
copy=False,
seed=seed,
is_label_categorical=self._label_feature_is_categorical)
for i, (mi, shuffle_mi, entropy) in enumerate(
zip(mi_per_feature, shuffled_mi_per_feature, entropy_per_feature)):
max_entropy = max(label_entropy, entropy)
ami = mi - shuffle_mi
# Bound normalized AMI to be in [0, 1].
# shuffle_mi <= max_entropy always holds.
if max_entropy == shuffle_mi:
# In the case of equality, MI(X, Y) <= max_entropy == shuffle_mi.
# So AMI = MI(X, Y) - shuffle_mi < 0. We cap it at 0.
normalized_ami = 0
else:
normalized_ami = min(1, max(0, ami / (max_entropy - shuffle_mi)))
result[df.columns[i]] = {
_MUTUAL_INFORMATION_KEY: mi.clip(min=0),
_ADJUSTED_MUTUAL_INFORMATION_KEY: ami,
_NORMALIZED_ADJUSTED_MUTUAL_INFORMATION_KEY: normalized_ami
}
return result