in tfx_bsl/tfxio/telemetry.py [0:0]
def _UpdateNumValuesDist(self, record_batch: pa.RecordBatch) -> None:
# Updates the distribution of number of values per cell.
# Note that a cell could be of a deeper nested type (e.g.
# Struct or nested ListArray), the number of values actually means
# lengths of leaves.
# For example, given the following row:
# col1 | col2
# [[[1, 2], [3]]] | [{'a': [1, 2]}, {'b': [3]}]]
# the number of values for col1 is 3
# the number of values for col2 will be updated twice because there are
# two leaves (col2.a, col2.b), with values 2, 1 respectively.
# Algorithm: create a mapping `m` (int->int) for array `a` so that if
# m[i] == j, then a[i] belongs to row j in the record batch.
# Then, np.bincount(m, minlength=record_batch.num_rows)[i] is how many
# values in `a` belong to row i. As we flatten the array, the mapping
# needs to be maintained so that it maps a flattened value to a row.
num_rows = record_batch.num_rows
def _RecursionHelper(row_indices, array):
"""Flattens `array` while maintains the `row_indices`."""
array_type = array.type
if _IsListLike(array_type):
parent_indices = np.asarray(
array_util.GetFlattenedArrayParentIndices(array))
_RecursionHelper(row_indices[parent_indices], array.flatten())
elif pa.types.is_struct(array_type):
for child in array.flatten():
_RecursionHelper(row_indices, child)
else:
value_type = _GetValueType(array.type)
dist_by_type = self._num_feature_values_dist_by_type[value_type]
for num_values in np.bincount(row_indices, minlength=num_rows).tolist():
dist_by_type.update(num_values)
self._num_feature_values_dist.update(num_values)
for column in record_batch:
_RecursionHelper(np.arange(num_rows, dtype=np.int64), column)