in tensorflow_io/python/experimental/numpy_dataset_ops.py [0:0]
def __init__(self, filename, spec=None, internal=True):
"""NumpyFileIODataset."""
with tf.name_scope("NumpyFileIODataset"):
assert internal
if tf.executing_eagerly():
arrays, shapes, dtypes = core_ops.io_numpy_info(filename=filename)
arrays = tf.unstack(arrays)
shapes = tf.unstack(shapes)
dtypes = tf.unstack(dtypes)
dtypes = [tf.as_dtype(dtype.numpy()) for dtype in dtypes]
entries = list(zip(shapes, dtypes, arrays))
entries = [
tf.TensorSpec(shape, dtype, array)
for (shape, dtype, array) in entries
]
indices = None
if all([e.numpy().decode().startswith("arr_") for e in arrays]):
try:
indices = [int(e.numpy()[4:]) for e in arrays]
except ValueError:
pass
if indices is not None:
values = list(indices)
values.sort()
if not all([k == v for k, v in enumerate(values)]):
indices = None
# if indices is continuously, then construct a tuple, otherwise a dict.
if indices is not None:
entries = dict(zip(indices, entries))
entries = tuple(entries[index] for index in sorted(indices))
else:
indices = [index.numpy().decode() for index in tf.unstack(arrays)]
entries = dict(zip(indices, entries))
flatten = tf.nest.flatten(entries)
shapes = [entry.shape for entry in flatten]
assert all([shape[0] == shapes[0][0] for shape in shapes])
else:
assert spec is not None
if isinstance(spec, tuple):
entries = tuple(
tf.TensorSpec(
None,
(v if isinstance(v, tf.dtypes.DType) else v.dtype),
f"arr_{i}",
)
for i, v in enumerate(spec)
)
else:
entries = {
k: tf.TensorSpec(
None, (v if isinstance(v, tf.dtypes.DType) else v.dtype), k
)
for k, v in spec.items()
}
flatten = tf.nest.flatten(entries)
def shape_f(entry):
shape, _ = core_ops.io_numpy_spec(
filename=filename, array=entry.name
)
return shape
shapes = [shape_f(entry) for entry in flatten]
def p(entry, shape):
return 0, filename, entry.name, shape, entry.dtype
params = [p(entry, shape) for entry, shape in zip(flatten, shapes)]
def f(start, stop):
return tf.nest.pack_sequence_as(
entries,
[
core_ops.io_numpy_read(
address=address,
filename=filename,
array=array,
shape=shape,
start=start,
stop=stop,
dtype=dtype,
)
for address, filename, array, shape, dtype in params
],
)
step = 1024
total = tf.cast(shapes[0][0], tf.int64)
indices_start = tf.data.Dataset.range(0, total, step)
indices_stop = indices_start.skip(1).concatenate(
tf.data.Dataset.from_tensor_slices([total])
)
dataset = tf.data.Dataset.zip((indices_start, indices_stop))
dataset = dataset.map(f)
dataset = dataset.unbatch()
self._dataset = dataset
super().__init__(
self._dataset._variant_tensor
) # pylint: disable=protected-access