in tensorflow_data_validation/statistics/generators/top_k_uniques_stats_generator.py [0:0]
def expand(self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
def _sum_pairwise(
iter_of_pairs: Iterable[Tuple[Union[int, float], Union[int, float]]]
) -> Tuple[Union[int, float], Union[int, float]]:
"""Computes sum of counts and weights."""
# We take advantage of the fact that constructing a np array from a list
# is much faster as the length is known beforehand.
if isinstance(iter_of_pairs, list):
arr = np.array(
iter_of_pairs, dtype=[('c', np.int64), ('w', np.float)])
else:
arr = np.fromiter(
iter_of_pairs, dtype=[('c', np.int64), ('w', np.float)])
return int(arr['c'].sum()), float(arr['w'].sum())
has_any_weight = bool(self._example_weight_map.all_weight_features())
class CombineCountsAndWeights(beam.PTransform):
def expand(self, pcoll):
if has_any_weight:
return pcoll | beam.CombinePerKey(_sum_pairwise)
else:
# For non-weighted case, use sum combine fn over integers to allow
# Beam to use Cython combiner.
return (pcoll
| 'RemoveWeights' >> beam.MapTuple(lambda k, v: (k, v[0]))
| beam.CombinePerKey(sum))
top_k_tuples_combined = (
pcoll
| 'ToTopKTuples' >> beam.FlatMap(
_to_topk_tuples,
bytes_features=self._bytes_features,
categorical_numeric_types=self._categorical_numeric_types,
example_weight_map=self._example_weight_map)
| 'CombineCountsAndWeights' >> CombineCountsAndWeights()
| 'Rearrange' >> beam.MapTuple(lambda k, v: ((k[0], k[1]), (v, k[2]))))
# (slice_key, feature_path_steps), (count_and_maybe_weight, value)
top_k = top_k_tuples_combined
if has_any_weight:
top_k |= 'Unweighted_DropWeightsAndRearrange' >> beam.MapTuple(
lambda k, v: (k, (v[0][0], v[1])))
# (slice_key, feature_path_steps), (count, value)
top_k = (
top_k
| 'Unweighted_TopK' >> beam.combiners.Top().PerKey(
max(self._num_top_values, self._num_rank_histogram_buckets))
| 'Unweighted_ToFeatureValueCount' >> beam.MapTuple(
# pylint: disable=g-long-lambda
lambda k, v: (k, [
top_k_uniques_stats_util.FeatureValueCount(t[1], t[0])
for t in v
])
# pylint: enable=g-long-lambda
)
| 'Unweighted_ToProto' >> beam.MapTuple(
# pylint: disable=g-long-lambda
lambda k, v:
(k[0],
top_k_uniques_stats_util.
make_dataset_feature_stats_proto_topk_single(
feature_path_tuple=k[1],
value_count_list=v,
is_weighted_stats=False,
num_top_values=self._num_top_values,
frequency_threshold=self._frequency_threshold,
num_rank_histogram_buckets=self._num_rank_histogram_buckets))
# pylint: enable=g-long-lambda
))
# (slice_key, DatasetFeatureStatistics)
uniques = (
top_k_tuples_combined
| 'Uniques_Keys' >> beam.Keys()
| 'Uniques_CountPerFeatureName' >> beam.combiners.Count().PerElement()
| 'Uniques_ConvertToSingleFeatureStats' >> beam.MapTuple(
# pylint: disable=g-long-lambda
lambda k, v:
(k[0],
top_k_uniques_stats_util.
make_dataset_feature_stats_proto_unique_single(
feature_path_tuple=k[1],
num_uniques=v))
# pylint: enable=g-long-lambda
))
# (slice_key, DatasetFeatureStatistics)
result_protos = [top_k, uniques]
if has_any_weight:
weighted_top_k = (
top_k_tuples_combined
| 'Weighted_DropCountsAndRearrange' >>
beam.MapTuple(lambda k, v: (k, (v[0][1], v[1])))
# (slice_key, feature), (weight, value)
| 'Weighted_TopK' >> beam.combiners.Top().PerKey(
max(self._num_top_values, self._num_rank_histogram_buckets))
| 'Weighted_ToFeatureValueCount' >> beam.MapTuple(
# pylint: disable=g-long-lambda
lambda k, v: (k, [
top_k_uniques_stats_util.FeatureValueCount(t[1], t[0])
for t in v
])
# pylint: enable=g-long-lambda
)
| 'Weighted_ToProto' >> beam.MapTuple(
# pylint: disable=g-long-lambda
lambda k, v:
(k[0],
top_k_uniques_stats_util.
make_dataset_feature_stats_proto_topk_single(
feature_path_tuple=k[1],
value_count_list=v,
is_weighted_stats=True,
num_top_values=self._num_top_values,
frequency_threshold=self._weighted_frequency_threshold,
num_rank_histogram_buckets=self._num_rank_histogram_buckets))
# pylint: enable=g-long-lambda
))
# (slice_key, DatasetFeatureStatistics)
result_protos.append(weighted_top_k)
return (result_protos
| 'FlattenTopKUniquesFeatureStatsProtos' >> beam.Flatten())