in tensorflow_data_validation/statistics/generators/basic_stats_generator.py [0:0]
def add_input(self, accumulator: _BasicAcctype,
examples: pa.RecordBatch) -> _BasicAcctype:
accumulator.num_examples += examples.num_rows
# Get the default weight, if it exists. This is always the weight we use
# for weighted num examples.
maybe_weight_feature = self._example_weight_map.get(types.FeaturePath([]))
if maybe_weight_feature:
weights_column = arrow_util.get_column(examples, maybe_weight_feature)
accumulator.weighted_num_examples += np.sum(
np.asarray(weights_column.flatten()))
for feature_path, feature_array, weights in arrow_util.enumerate_arrays(
examples,
example_weight_map=self._example_weight_map,
enumerate_leaves_only=False):
stats_for_feature = accumulator.get(feature_path)
if stats_for_feature is None:
stats_for_feature = _PartialBasicStats(
weights is not None, self._make_quantiles_sketch_fn)
accumulator[feature_path] = stats_for_feature
feature_type = stats_util.get_feature_type_from_arrow_type(
feature_path, feature_array.type)
stats_for_feature.common_stats.update(feature_path,
feature_array, feature_type,
self._make_quantiles_sketch_fn,
weights)
# The user may make certain claims about a feature's data type
# (e.g. _bytes_features imply string data type). However we should not
# trust those claims because TFDV is also responsible for detecting
# mismatching types. We collect stats according to the actual type, and
# only when the actual type matches the claim do we collect the
# type-specific stats (like for categorical int and bytes features).
if feature_type == statistics_pb2.FeatureNameStatistics.STRING:
if feature_path in self._bytes_features:
stats_for_feature.bytes_stats.update(feature_array)
else:
stats_for_feature.string_stats.update(feature_array)
# We want to compute string stats for a numeric only if a top-k stats
# generator is running, hence the dependency on this library function.
elif top_k_uniques_stats_util.output_categorical_numeric(
self._categorical_numeric_types, feature_path, feature_type):
stats_for_feature.string_stats.update(feature_array)
elif feature_type in (statistics_pb2.FeatureNameStatistics.FLOAT,
statistics_pb2.FeatureNameStatistics.INT):
stats_for_feature.numeric_stats.update(feature_array, weights)
return accumulator