in tensorflow_data_validation/statistics/generators/basic_stats_generator.py [0:0]
def merge_accumulators(
self, accumulators: Iterable[_BasicAcctype]) -> _BasicAcctype:
result = _BasicAcctype()
for accumulator in accumulators:
result.num_examples += accumulator.num_examples
result.weighted_num_examples += accumulator.weighted_num_examples
for feature_path, basic_stats in accumulator.items():
current_type = basic_stats.common_stats.type
existing_stats = result.get(feature_path)
if existing_stats is None:
existing_stats = basic_stats
result[feature_path] = basic_stats
else:
# Check if the types from the two partial statistics are not
# compatible. If so, raise an error. We consider types to be
# compatible if both types are same or one of them is None.
left_type = existing_stats.common_stats.type
right_type = current_type
if (left_type is not None and right_type is not None and
left_type != right_type):
raise TypeError('Cannot determine the type of feature %s. '
'Found values of types %s and %s.' %
(feature_path, left_type, right_type))
existing_stats.common_stats.merge_with(feature_path,
basic_stats.common_stats)
if current_type is not None:
if feature_path in self._bytes_features:
existing_stats.bytes_stats += basic_stats.bytes_stats
elif (top_k_uniques_stats_util.output_categorical_numeric(
self._categorical_numeric_types, feature_path, current_type) or
current_type == statistics_pb2.FeatureNameStatistics.STRING):
existing_stats.string_stats += basic_stats.string_stats
elif current_type in (statistics_pb2.FeatureNameStatistics.INT,
statistics_pb2.FeatureNameStatistics.FLOAT):
existing_stats.numeric_stats += basic_stats.numeric_stats
return result