def __init__()

in tensorflow_io/python/experimental/numpy_dataset_ops.py [0:0]


    def __init__(self, filename, spec=None, internal=True):
        """NumpyFileIODataset."""
        with tf.name_scope("NumpyFileIODataset"):
            assert internal

            if tf.executing_eagerly():
                arrays, shapes, dtypes = core_ops.io_numpy_info(filename=filename)
                arrays = tf.unstack(arrays)
                shapes = tf.unstack(shapes)
                dtypes = tf.unstack(dtypes)
                dtypes = [tf.as_dtype(dtype.numpy()) for dtype in dtypes]

                entries = list(zip(shapes, dtypes, arrays))
                entries = [
                    tf.TensorSpec(shape, dtype, array)
                    for (shape, dtype, array) in entries
                ]

                indices = None
                if all([e.numpy().decode().startswith("arr_") for e in arrays]):
                    try:
                        indices = [int(e.numpy()[4:]) for e in arrays]
                    except ValueError:
                        pass
                if indices is not None:
                    values = list(indices)
                    values.sort()
                    if not all([k == v for k, v in enumerate(values)]):
                        indices = None

                # if indices is continuously, then construct a tuple, otherwise a dict.
                if indices is not None:
                    entries = dict(zip(indices, entries))
                    entries = tuple(entries[index] for index in sorted(indices))
                else:
                    indices = [index.numpy().decode() for index in tf.unstack(arrays)]
                    entries = dict(zip(indices, entries))

                flatten = tf.nest.flatten(entries)
                shapes = [entry.shape for entry in flatten]
                assert all([shape[0] == shapes[0][0] for shape in shapes])
            else:
                assert spec is not None
                if isinstance(spec, tuple):
                    entries = tuple(
                        tf.TensorSpec(
                            None,
                            (v if isinstance(v, tf.dtypes.DType) else v.dtype),
                            f"arr_{i}",
                        )
                        for i, v in enumerate(spec)
                    )
                else:
                    entries = {
                        k: tf.TensorSpec(
                            None, (v if isinstance(v, tf.dtypes.DType) else v.dtype), k
                        )
                        for k, v in spec.items()
                    }
                flatten = tf.nest.flatten(entries)

                def shape_f(entry):
                    shape, _ = core_ops.io_numpy_spec(
                        filename=filename, array=entry.name
                    )
                    return shape

                shapes = [shape_f(entry) for entry in flatten]

            def p(entry, shape):
                return 0, filename, entry.name, shape, entry.dtype

            params = [p(entry, shape) for entry, shape in zip(flatten, shapes)]

            def f(start, stop):
                return tf.nest.pack_sequence_as(
                    entries,
                    [
                        core_ops.io_numpy_read(
                            address=address,
                            filename=filename,
                            array=array,
                            shape=shape,
                            start=start,
                            stop=stop,
                            dtype=dtype,
                        )
                        for address, filename, array, shape, dtype in params
                    ],
                )

            step = 1024
            total = tf.cast(shapes[0][0], tf.int64)
            indices_start = tf.data.Dataset.range(0, total, step)
            indices_stop = indices_start.skip(1).concatenate(
                tf.data.Dataset.from_tensor_slices([total])
            )
            dataset = tf.data.Dataset.zip((indices_start, indices_stop))
            dataset = dataset.map(f)
            dataset = dataset.unbatch()

            self._dataset = dataset
            super().__init__(
                self._dataset._variant_tensor
            )  # pylint: disable=protected-access