def GetTensor()

in tfx_bsl/tfxio/tensor_adapter.py [0:0]


  def GetTensor(self, record_batch: pa.RecordBatch,
                produce_eager_tensors: bool) -> Union[np.ndarray, tf.Tensor]:
    if (self._row_partition_dtype ==
        schema_pb2.TensorRepresentation.RowPartitionDType.INT32):
      offsets_dtype = np.int32
    elif (self._row_partition_dtype ==
          schema_pb2.TensorRepresentation.RowPartitionDType.INT64 or
          self._row_partition_dtype ==
          schema_pb2.TensorRepresentation.RowPartitionDType.UNSPECIFIED):
      offsets_dtype = np.int64

    if produce_eager_tensors:
      factory = tf.RaggedTensor.from_row_splits
    else:
      factory = tf.compat.v1.ragged.RaggedTensorValue

    # A RaggedTensor is composed by the following dimensions:
    # [B, D_0, D_1, ..., D_N, P_0, P_1, ..., P_M, U_0, U_1, ..., U_P]
    #
    # These dimensions belong to different categories:
    # * B: Batch size dimension
    # * D_n: Dimensions specified by the nested structure from the schema and
    # the column path to the values. n >= 1.
    # * P_m: Dimensions specified by the partitions that do not specify a fixed
    # dimension size. m >= 0.
    # * U_p: Dimensions specified by the inner uniform row length partitions
    # that make the inner dimensions fixed. p>=0.

    # Get row splits of each level in the record batch.
    # Store the row splits for the Dn dimensions that store the representation
    # of the nested structure on the dataset schema.
    outer_row_splits = []

    column_path = self._value_path.suffix(1)
    column = record_batch.column(self._column_index)
    column_type = column.type
    # Keep track of an accessor for the parent struct, so we can access other
    # fields required to get future dimensions row splits.
    parent_field_accessor = lambda field: record_batch.column(  # pylint:disable=g-long-lambda
        record_batch.schema.get_field_index(field))

    while True:
      # TODO(b/156514075): add support for handling slices.
      if column.offset != 0:
        raise ValueError(
            "This record batch is sliced. We currently do not handle converting"
            " slices to RaggedTensors.")
      if pa.types.is_struct(column_type):
        parent_column = column
        parent_field_accessor = parent_column.field
        column = column.field(column_path.initial_step())
        column_path = column_path.suffix(1)
        column_type = column.type
      elif _IsListLike(column_type):
        outer_row_splits.append(np.asarray(column.offsets, dtype=offsets_dtype))
        column = column.flatten()
        column_type = column.type
      else:
        break

    # Now that we have stored the row splits for the Dn dimensions, lets
    # start the construction of the RaggedTensor from the inner dimensions to
    # the outermost.

    # Take the values and set the shape for the inner most dimensions (Up)
    values = column
    if self._convert_to_binary_fn is not None:
      values = self._convert_to_binary_fn(values)
    values = np.asarray(values)
    values = np.reshape(values, self._values_fixed_shape)

    ragged_tensor = values

    # Build the RaggedTensor from the values and the specified partitions.

    # Now iterate from inner most partitions to outermost.
    # But first we need pop the last row split from the outer dimensions (D_n)
    # and scale it given the number of elements in the inner fixed dimensions.
    try:
      outer_last_row_split = _FloorDivide(outer_row_splits.pop(),
                                          self._inferred_dimensions_elements)
    except RuntimeError as e:
      raise ValueError(
          ("The values features lenghts cannot support "
           "the claimed fixed shape {}").format(self._inner_fixed_shape)) from e

    # Keep track of the previous dimension to help building row splits when an
    # uniform row length partition is found.
    prev_dimension = values.shape[0]
    for partition in reversed(self._ragged_partitions):
      if partition.HasField("uniform_row_length"):
        # If a uniform row length partition is found, we need to scale down the
        # last outer dimension row split.
        try:
          outer_last_row_split = _FloorDivide(outer_last_row_split,
                                              partition.uniform_row_length)
        except RuntimeError as e:
          raise ValueError(("The values features lengths cannnot support the "
                            "specified uniform row length of size {}").format(
                                partition.uniform_row_length)) from e

        row_splits = np.arange(
            0,
            prev_dimension + 1,
            partition.uniform_row_length,
            dtype=offsets_dtype)

        ragged_tensor = factory(ragged_tensor, row_splits=row_splits)
        try:
          prev_dimension = _FloorDivide(prev_dimension,
                                        partition.uniform_row_length)
        except RuntimeError as e:
          raise ValueError(
              ("The previous ragged partitions contained {} elements, "
               "which are not valid with the specified uniform row length: {}"
              ).format(prev_dimension, partition.uniform_row_length)) from e

      elif partition.HasField("row_length"):
        row_length_array = parent_field_accessor(partition.row_length)

        # When the outer most dimension specified by the partitions (P_0) comes
        # from another array other than values, we need to update the last
        # dimension row splits defined by the nested structure (D_n) given the
        # offsets of the array.
        outer_last_row_split = np.asarray(
            row_length_array.offsets, dtype=offsets_dtype)

        # Build row splits.
        row_length = np.asarray(row_length_array.flatten())
        row_splits = np.zeros(len(row_length) + 1, dtype=offsets_dtype)
        np.cumsum(row_length, out=row_splits[1:])

        if prev_dimension != row_splits[-1]:
          raise ValueError(
              ("The sum of row lengts provided in '{}' do not match "
               "with previous dimension found {}.").format(
                   partition.row_length, prev_dimension))

        ragged_tensor = factory(ragged_tensor, row_splits=row_splits)
        prev_dimension = len(row_length)

      else:
        raise ValueError("Empty partition found.")

    # Add back the last row split from the outer dimensions (D_n).
    outer_row_splits.append(outer_last_row_split)

    # Apply the outer ragged dimensions to thre resulting tensor.
    # Now that the RaggedTensor is build up to the P_0 dimensions, we need to
    # specify the row splits for the D_n dimensions.
    for row_split in reversed(outer_row_splits):
      ragged_tensor = factory(ragged_tensor, row_splits=row_split)

    return ragged_tensor