def expand()

in tensorflow_data_validation/statistics/generators/top_k_uniques_stats_generator.py [0:0]


  def expand(self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:

    def _sum_pairwise(
        iter_of_pairs: Iterable[Tuple[Union[int, float], Union[int, float]]]
    ) -> Tuple[Union[int, float], Union[int, float]]:
      """Computes sum of counts and weights."""
      # We take advantage of the fact that constructing a np array from a list
      # is much faster as the length is known beforehand.
      if isinstance(iter_of_pairs, list):
        arr = np.array(
            iter_of_pairs, dtype=[('c', np.int64), ('w', np.float)])
      else:
        arr = np.fromiter(
            iter_of_pairs, dtype=[('c', np.int64), ('w', np.float)])
      return int(arr['c'].sum()), float(arr['w'].sum())

    has_any_weight = bool(self._example_weight_map.all_weight_features())

    class CombineCountsAndWeights(beam.PTransform):

      def expand(self, pcoll):
        if has_any_weight:
          return pcoll | beam.CombinePerKey(_sum_pairwise)
        else:
          # For non-weighted case, use sum combine fn over integers to allow
          # Beam to use Cython combiner.
          return (pcoll
                  | 'RemoveWeights' >> beam.MapTuple(lambda k, v: (k, v[0]))
                  | beam.CombinePerKey(sum))

    top_k_tuples_combined = (
        pcoll
        | 'ToTopKTuples' >> beam.FlatMap(
            _to_topk_tuples,
            bytes_features=self._bytes_features,
            categorical_numeric_types=self._categorical_numeric_types,
            example_weight_map=self._example_weight_map)
        | 'CombineCountsAndWeights' >> CombineCountsAndWeights()
        | 'Rearrange' >> beam.MapTuple(lambda k, v: ((k[0], k[1]), (v, k[2]))))
    # (slice_key, feature_path_steps), (count_and_maybe_weight, value)

    top_k = top_k_tuples_combined
    if has_any_weight:
      top_k |= 'Unweighted_DropWeightsAndRearrange' >> beam.MapTuple(
          lambda k, v: (k, (v[0][0], v[1])))
      # (slice_key, feature_path_steps), (count, value)

    top_k = (
        top_k
        | 'Unweighted_TopK' >> beam.combiners.Top().PerKey(
            max(self._num_top_values, self._num_rank_histogram_buckets))
        | 'Unweighted_ToFeatureValueCount' >> beam.MapTuple(
            # pylint: disable=g-long-lambda
            lambda k, v: (k, [
                top_k_uniques_stats_util.FeatureValueCount(t[1], t[0])
                for t in v
            ])
            # pylint: enable=g-long-lambda
        )
        | 'Unweighted_ToProto' >> beam.MapTuple(
            # pylint: disable=g-long-lambda
            lambda k, v:
            (k[0],
             top_k_uniques_stats_util.
             make_dataset_feature_stats_proto_topk_single(
                 feature_path_tuple=k[1],
                 value_count_list=v,
                 is_weighted_stats=False,
                 num_top_values=self._num_top_values,
                 frequency_threshold=self._frequency_threshold,
                 num_rank_histogram_buckets=self._num_rank_histogram_buckets))
            # pylint: enable=g-long-lambda
        ))
    # (slice_key, DatasetFeatureStatistics)

    uniques = (
        top_k_tuples_combined
        | 'Uniques_Keys' >> beam.Keys()
        | 'Uniques_CountPerFeatureName' >> beam.combiners.Count().PerElement()
        | 'Uniques_ConvertToSingleFeatureStats' >> beam.MapTuple(
            # pylint: disable=g-long-lambda
            lambda k, v:
            (k[0],
             top_k_uniques_stats_util.
             make_dataset_feature_stats_proto_unique_single(
                 feature_path_tuple=k[1],
                 num_uniques=v))
            # pylint: enable=g-long-lambda
        ))
    # (slice_key, DatasetFeatureStatistics)

    result_protos = [top_k, uniques]

    if has_any_weight:
      weighted_top_k = (
          top_k_tuples_combined
          | 'Weighted_DropCountsAndRearrange' >>
          beam.MapTuple(lambda k, v: (k, (v[0][1], v[1])))
          # (slice_key, feature), (weight, value)
          | 'Weighted_TopK' >> beam.combiners.Top().PerKey(
              max(self._num_top_values, self._num_rank_histogram_buckets))
          | 'Weighted_ToFeatureValueCount' >> beam.MapTuple(
              # pylint: disable=g-long-lambda
              lambda k, v: (k, [
                  top_k_uniques_stats_util.FeatureValueCount(t[1], t[0])
                  for t in v
              ])
              # pylint: enable=g-long-lambda
          )
          | 'Weighted_ToProto' >> beam.MapTuple(
              # pylint: disable=g-long-lambda
              lambda k, v:
              (k[0],
               top_k_uniques_stats_util.
               make_dataset_feature_stats_proto_topk_single(
                   feature_path_tuple=k[1],
                   value_count_list=v,
                   is_weighted_stats=True,
                   num_top_values=self._num_top_values,
                   frequency_threshold=self._weighted_frequency_threshold,
                   num_rank_histogram_buckets=self._num_rank_histogram_buckets))
              # pylint: enable=g-long-lambda
          ))
      # (slice_key, DatasetFeatureStatistics)

      result_protos.append(weighted_top_k)

    return (result_protos
            | 'FlattenTopKUniquesFeatureStatsProtos' >> beam.Flatten())