def _calculate_mi()

in tensorflow_data_validation/statistics/generators/sklearn_mutual_information.py [0:0]


  def _calculate_mi(self, df: pd.DataFrame, labels: np.ndarray,
                    discrete_feature_mask: List[bool],
                    seed: int) -> Dict[types.FeaturePath, Dict[Text, float]]:
    """Calls the sk-learn implementation of MI and stores results in dict.

    Args:
      df: A pd.DataFrame containing feature values where each column corresponds
        to a feature and each row corresponds to an example.
      labels: A List where the ith index represents the label for the ith
        example.
      discrete_feature_mask: A boolean list where the ith element is true iff
        the ith feature column in the input df is a categorical feature.
      seed: An int value to seed the RNG used in MI computation.

    Returns:
      Dict[FeatureName, Dict[str,float]] where the keys of the dicts are the
      feature name and values are a dict where the keys are
      _MUTUAL_INFORMATION_KEY, _ADJUSTED_MUTUAL_INFORMATION_KEY,
      _NORMALIZED_ADJUSTED_MUTUAL_INFORMATION_KEY and the values are the MI,
      AMI, and normalized AMI for that feature.
    """
    result = {}

    # Calculate MI for each feature.
    mi_per_feature = _sklearn_calculate_mi_wrapper(
        df.values,
        labels,
        discrete_features=discrete_feature_mask,
        copy=True,
        seed=seed,
        is_label_categorical=self._label_feature_is_categorical)

    if mi_per_feature is None:
      # MI could not be calculated.
      return result

    # There are multiple ways to normalized AMI. We choose to calculate it as:
    # Normalized AMI(X, Y) = AMI(X, Y) / (Max{H(X), H(Y)} - shuffle_mi(X, Y))
    # Where H(X) is the entropy of X.
    #
    # We can derive entropy from MI(X, X) as follows:
    # MI(X, X) = H(X) - H(X|X) = H(X)

    # Calculate H(feature), for each feature.
    entropy_per_feature = []
    for col in df.columns:
      col_is_categorical = col in self._categorical_features
      entropy = _sklearn_calculate_mi_wrapper(
          np.array([[x] for x in df[col].values]),
          df[col].values,
          discrete_features=col_is_categorical,
          copy=True,
          seed=seed,
          is_label_categorical=col_is_categorical)
      # The entropy might not exist for a feature. This is because now we are
      # treating each feature as a label. The features could be a mix of
      # categorical and numerical features, thus MI is calculated on a case by
      # case basis, and may not exist in some cases.
      # Setting it to 0 will not affect the normalized AMI result, since we are
      # looking for max entropy.
      entropy_per_feature.append(entropy[0] if entropy else 0)

    # Calculate H(label)
    if self._label_feature_is_categorical:
      # Encode categorical labels as numerical.
      _, integerized_label = np.unique(labels, return_inverse=True)
      labels_as_feature = np.array([[x] for x in integerized_label])
    else:
      labels_as_feature = np.array([[x] for x in labels])
    label_entropy = _sklearn_calculate_mi_wrapper(
        labels_as_feature,
        labels,
        discrete_features=self._label_feature_is_categorical,
        copy=True,
        seed=seed,
        is_label_categorical=self._label_feature_is_categorical)
    # label_entropy is guaranteed to exist. If it does not exist, then
    # mi_per_feature would have been None (and we would have exited this).
    assert len(label_entropy) == 1
    label_entropy = label_entropy[0]

    # Shuffle the labels and calculate the MI. This allows us to adjust
    # the MI for any memorization in the model.
    np.random.shuffle(labels)
    shuffled_mi_per_feature = _sklearn_calculate_mi_wrapper(
        df.values,
        labels,
        discrete_features=discrete_feature_mask,
        copy=False,
        seed=seed,
        is_label_categorical=self._label_feature_is_categorical)

    for i, (mi, shuffle_mi, entropy) in enumerate(
        zip(mi_per_feature, shuffled_mi_per_feature, entropy_per_feature)):
      max_entropy = max(label_entropy, entropy)
      ami = mi - shuffle_mi

      # Bound normalized AMI to be in [0, 1].
      # shuffle_mi <= max_entropy always holds.
      if max_entropy == shuffle_mi:
        # In the case of equality, MI(X, Y) <= max_entropy == shuffle_mi.
        # So AMI = MI(X, Y) - shuffle_mi < 0. We cap it at 0.
        normalized_ami = 0
      else:
        normalized_ami = min(1, max(0, ami / (max_entropy - shuffle_mi)))

      result[df.columns[i]] = {
          _MUTUAL_INFORMATION_KEY: mi.clip(min=0),
          _ADJUSTED_MUTUAL_INFORMATION_KEY: ami,
          _NORMALIZED_ADJUSTED_MUTUAL_INFORMATION_KEY: normalized_ami
      }
    return result