def _UpdateNumValuesDist()

in tfx_bsl/tfxio/telemetry.py [0:0]


  def _UpdateNumValuesDist(self, record_batch: pa.RecordBatch) -> None:
    # Updates the distribution of number of values per cell.
    # Note that a cell could be of a deeper nested type (e.g.
    # Struct or nested ListArray), the number of values actually means
    # lengths of leaves.
    # For example, given the following row:
    # col1               |    col2
    # [[[1, 2], [3]]]    |    [{'a': [1, 2]}, {'b': [3]}]]
    # the number of values for col1 is 3
    # the number of values for col2 will be updated twice because there are
    # two leaves (col2.a, col2.b), with values 2, 1 respectively.

    # Algorithm: create a mapping `m` (int->int) for array `a` so that if
    # m[i] == j, then a[i] belongs to row j in the record batch.
    # Then, np.bincount(m, minlength=record_batch.num_rows)[i] is how many
    # values in `a` belong to row i. As we flatten the array, the mapping
    # needs to be maintained so that it maps a flattened value to a row.
    num_rows = record_batch.num_rows

    def _RecursionHelper(row_indices, array):
      """Flattens `array` while maintains the `row_indices`."""
      array_type = array.type
      if _IsListLike(array_type):
        parent_indices = np.asarray(
            array_util.GetFlattenedArrayParentIndices(array))
        _RecursionHelper(row_indices[parent_indices], array.flatten())
      elif pa.types.is_struct(array_type):
        for child in array.flatten():
          _RecursionHelper(row_indices, child)
      else:
        value_type = _GetValueType(array.type)
        dist_by_type = self._num_feature_values_dist_by_type[value_type]
        for num_values in np.bincount(row_indices, minlength=num_rows).tolist():
          dist_by_type.update(num_values)
          self._num_feature_values_dist.update(num_values)

    for column in record_batch:
      _RecursionHelper(np.arange(num_rows, dtype=np.int64), column)