in tfx_bsl/tfxio/tensor_adapter.py [0:0]
def GetTensor(self, record_batch: pa.RecordBatch,
produce_eager_tensors: bool) -> Union[np.ndarray, tf.Tensor]:
if (self._row_partition_dtype ==
schema_pb2.TensorRepresentation.RowPartitionDType.INT32):
offsets_dtype = np.int32
elif (self._row_partition_dtype ==
schema_pb2.TensorRepresentation.RowPartitionDType.INT64 or
self._row_partition_dtype ==
schema_pb2.TensorRepresentation.RowPartitionDType.UNSPECIFIED):
offsets_dtype = np.int64
if produce_eager_tensors:
factory = tf.RaggedTensor.from_row_splits
else:
factory = tf.compat.v1.ragged.RaggedTensorValue
# A RaggedTensor is composed by the following dimensions:
# [B, D_0, D_1, ..., D_N, P_0, P_1, ..., P_M, U_0, U_1, ..., U_P]
#
# These dimensions belong to different categories:
# * B: Batch size dimension
# * D_n: Dimensions specified by the nested structure from the schema and
# the column path to the values. n >= 1.
# * P_m: Dimensions specified by the partitions that do not specify a fixed
# dimension size. m >= 0.
# * U_p: Dimensions specified by the inner uniform row length partitions
# that make the inner dimensions fixed. p>=0.
# Get row splits of each level in the record batch.
# Store the row splits for the Dn dimensions that store the representation
# of the nested structure on the dataset schema.
outer_row_splits = []
column_path = self._value_path.suffix(1)
column = record_batch.column(self._column_index)
column_type = column.type
# Keep track of an accessor for the parent struct, so we can access other
# fields required to get future dimensions row splits.
parent_field_accessor = lambda field: record_batch.column( # pylint:disable=g-long-lambda
record_batch.schema.get_field_index(field))
while True:
# TODO(b/156514075): add support for handling slices.
if column.offset != 0:
raise ValueError(
"This record batch is sliced. We currently do not handle converting"
" slices to RaggedTensors.")
if pa.types.is_struct(column_type):
parent_column = column
parent_field_accessor = parent_column.field
column = column.field(column_path.initial_step())
column_path = column_path.suffix(1)
column_type = column.type
elif _IsListLike(column_type):
outer_row_splits.append(np.asarray(column.offsets, dtype=offsets_dtype))
column = column.flatten()
column_type = column.type
else:
break
# Now that we have stored the row splits for the Dn dimensions, lets
# start the construction of the RaggedTensor from the inner dimensions to
# the outermost.
# Take the values and set the shape for the inner most dimensions (Up)
values = column
if self._convert_to_binary_fn is not None:
values = self._convert_to_binary_fn(values)
values = np.asarray(values)
values = np.reshape(values, self._values_fixed_shape)
ragged_tensor = values
# Build the RaggedTensor from the values and the specified partitions.
# Now iterate from inner most partitions to outermost.
# But first we need pop the last row split from the outer dimensions (D_n)
# and scale it given the number of elements in the inner fixed dimensions.
try:
outer_last_row_split = _FloorDivide(outer_row_splits.pop(),
self._inferred_dimensions_elements)
except RuntimeError as e:
raise ValueError(
("The values features lenghts cannot support "
"the claimed fixed shape {}").format(self._inner_fixed_shape)) from e
# Keep track of the previous dimension to help building row splits when an
# uniform row length partition is found.
prev_dimension = values.shape[0]
for partition in reversed(self._ragged_partitions):
if partition.HasField("uniform_row_length"):
# If a uniform row length partition is found, we need to scale down the
# last outer dimension row split.
try:
outer_last_row_split = _FloorDivide(outer_last_row_split,
partition.uniform_row_length)
except RuntimeError as e:
raise ValueError(("The values features lengths cannnot support the "
"specified uniform row length of size {}").format(
partition.uniform_row_length)) from e
row_splits = np.arange(
0,
prev_dimension + 1,
partition.uniform_row_length,
dtype=offsets_dtype)
ragged_tensor = factory(ragged_tensor, row_splits=row_splits)
try:
prev_dimension = _FloorDivide(prev_dimension,
partition.uniform_row_length)
except RuntimeError as e:
raise ValueError(
("The previous ragged partitions contained {} elements, "
"which are not valid with the specified uniform row length: {}"
).format(prev_dimension, partition.uniform_row_length)) from e
elif partition.HasField("row_length"):
row_length_array = parent_field_accessor(partition.row_length)
# When the outer most dimension specified by the partitions (P_0) comes
# from another array other than values, we need to update the last
# dimension row splits defined by the nested structure (D_n) given the
# offsets of the array.
outer_last_row_split = np.asarray(
row_length_array.offsets, dtype=offsets_dtype)
# Build row splits.
row_length = np.asarray(row_length_array.flatten())
row_splits = np.zeros(len(row_length) + 1, dtype=offsets_dtype)
np.cumsum(row_length, out=row_splits[1:])
if prev_dimension != row_splits[-1]:
raise ValueError(
("The sum of row lengts provided in '{}' do not match "
"with previous dimension found {}.").format(
partition.row_length, prev_dimension))
ragged_tensor = factory(ragged_tensor, row_splits=row_splits)
prev_dimension = len(row_length)
else:
raise ValueError("Empty partition found.")
# Add back the last row split from the outer dimensions (D_n).
outer_row_splits.append(outer_last_row_split)
# Apply the outer ragged dimensions to thre resulting tensor.
# Now that the RaggedTensor is build up to the P_0 dimensions, we need to
# specify the row splits for the D_n dimensions.
for row_split in reversed(outer_row_splits):
ragged_tensor = factory(ragged_tensor, row_splits=row_split)
return ragged_tensor