def _fairness_indicators_metrics_at_thresholds()

in tensorflow_model_analysis/addons/fairness/metrics/fairness_indicators.py [0:0]


def _fairness_indicators_metrics_at_thresholds(
    thresholds: List[float],
    name: str = FAIRNESS_INDICATORS_METRICS_NAME,
    eval_config: Optional[config_pb2.EvalConfig] = None,
    model_name: str = '',
    output_name: str = '',
    aggregation_type: Optional[metric_types.AggregationType] = None,
    sub_key: Optional[metric_types.SubKey] = None,
    class_weights: Optional[Dict[int, float]] = None,
    example_weighted: bool = False) -> metric_types.MetricComputations:
  """Returns computations for fairness metrics at thresholds."""
  metric_key_by_name_by_threshold = collections.defaultdict(dict)
  keys = []
  digits_num = calculate_digits(thresholds)
  for t in thresholds:
    for m in FAIRNESS_INDICATORS_SUB_METRICS:
      key = metric_types.MetricKey(
          name='%s/%s@%.*f' %
          (name, m, digits_num,
           t),  # e.g. "fairness_indicators_metrics/positive_rate@0.5"
          model_name=model_name,
          output_name=output_name,
          sub_key=sub_key,
          example_weighted=example_weighted)
      keys.append(key)
      metric_key_by_name_by_threshold[t][m] = key

  # Make sure matrices are calculated.
  computations = binary_confusion_matrices.binary_confusion_matrices(
      eval_config=eval_config,
      model_name=model_name,
      output_name=output_name,
      sub_key=sub_key,
      aggregation_type=aggregation_type,
      class_weights=class_weights,
      example_weighted=example_weighted,
      thresholds=thresholds)
  confusion_matrices_key = computations[-1].keys[-1]

  def result(
      metrics: Dict[metric_types.MetricKey, Any]
  ) -> Dict[metric_types.MetricKey, Any]:
    """Returns fairness metrics values."""
    metric = metrics[confusion_matrices_key]
    output = {}

    for i, threshold in enumerate(thresholds):
      num_positives = metric.tp[i] + metric.fn[i]
      num_negatives = metric.tn[i] + metric.fp[i]

      tpr = metric.tp[i] / (num_positives or float('nan'))
      tnr = metric.tn[i] / (num_negatives or float('nan'))
      fpr = metric.fp[i] / (num_negatives or float('nan'))
      fnr = metric.fn[i] / (num_positives or float('nan'))
      pr = (metric.tp[i] + metric.fp[i]) / (
          (num_positives + num_negatives) or float('nan'))
      nr = (metric.tn[i] + metric.fn[i]) / (
          (num_positives + num_negatives) or float('nan'))

      fdr = metric.fp[i] / ((metric.fp[i] + metric.tp[i]) or float('nan'))
      fomr = metric.fn[i] / ((metric.fn[i] + metric.tn[i]) or float('nan'))

      output[metric_key_by_name_by_threshold[threshold]
             ['false_positive_rate']] = fpr
      output[metric_key_by_name_by_threshold[threshold]
             ['false_negative_rate']] = fnr
      output[metric_key_by_name_by_threshold[threshold]
             ['true_positive_rate']] = tpr
      output[metric_key_by_name_by_threshold[threshold]
             ['true_negative_rate']] = tnr
      output[metric_key_by_name_by_threshold[threshold]['positive_rate']] = pr
      output[metric_key_by_name_by_threshold[threshold]['negative_rate']] = nr
      output[metric_key_by_name_by_threshold[threshold]
             ['false_discovery_rate']] = fdr
      output[metric_key_by_name_by_threshold[threshold]
             ['false_omission_rate']] = fomr

    return output

  derived_computation = metric_types.DerivedMetricComputation(
      keys=keys, result=result)

  computations.append(derived_computation)
  return computations