in tensorflow_estimator/python/estimator/inputs/queues/feeding_functions.py [0:0]
def _enqueue_data(data,
capacity,
shuffle=False,
min_after_dequeue=None,
num_threads=1,
seed=None,
name="enqueue_input",
enqueue_size=1,
num_epochs=None,
pad_value=None):
"""Creates a queue filled from a numpy array or pandas `DataFrame`.
Returns a queue filled with the rows of the given (`OrderedDict` of) array
or `DataFrame`. In the case of a pandas `DataFrame`, the first enqueued
`Tensor` corresponds to the index of the `DataFrame`. For (`OrderedDict` of)
numpy arrays, the first enqueued `Tensor` contains the row number.
Args:
data: a numpy `ndarray`, `OrderedDict` of numpy arrays, or a generator
yielding `dict`s of numpy arrays or pandas `DataFrame` that will be read
into the queue.
capacity: the capacity of the queue.
shuffle: whether or not to shuffle the rows of the array.
min_after_dequeue: minimum number of elements that can remain in the queue
after a dequeue operation. Only used when `shuffle` is true. If not set,
defaults to `capacity` / 4.
num_threads: number of threads used for reading and enqueueing.
seed: used to seed shuffling and reader starting points.
name: a scope name identifying the data.
enqueue_size: the number of rows to enqueue per step.
num_epochs: limit enqueuing to a specified number of epochs, if provided.
pad_value: default value for dynamic padding of data samples, if provided.
Returns:
A queue filled with the rows of the given (`OrderedDict` of) array or
`DataFrame`.
Raises:
TypeError: `data` is not a Pandas `DataFrame`, an `OrderedDict` of numpy
arrays, a numpy `ndarray`, or a generator producing these.
NotImplementedError: padding and shuffling data at the same time.
NotImplementedError: padding usage with non generator data type.
"""
with ops.name_scope(name):
if isinstance(data, np.ndarray):
types = [tf.dtypes.int64, tf.dtypes.as_dtype(data.dtype)]
queue_shapes = [(), data.shape[1:]]
get_feed_fn = _ArrayFeedFn
elif isinstance(data, collections.OrderedDict):
types = [tf.dtypes.int64
] + [tf.dtypes.as_dtype(col.dtype) for col in data.values()]
queue_shapes = [()] + [col.shape[1:] for col in data.values()]
get_feed_fn = _OrderedDictNumpyFeedFn
elif isinstance(data, tp.FunctionType):
x_first_el = six.next(data())
x_first_keys = sorted(x_first_el.keys())
x_first_values = [x_first_el[key] for key in x_first_keys]
types = [tf.dtypes.as_dtype(col.dtype) for col in x_first_values]
queue_shapes = [col.shape for col in x_first_values]
get_feed_fn = _GeneratorFeedFn
elif HAS_PANDAS and isinstance(data, pd.DataFrame):
types = [
tf.dtypes.as_dtype(dt)
for dt in [data.index.dtype] + list(data.dtypes)
]
queue_shapes = [() for _ in types]
get_feed_fn = _PandasFeedFn
else:
raise TypeError(
"data must be either a numpy array or pandas DataFrame if pandas is "
"installed; got {}".format(type(data).__name__))
pad_data = pad_value is not None
if pad_data and get_feed_fn is not _GeneratorFeedFn:
raise NotImplementedError(
"padding is only available with generator usage")
if shuffle and pad_data:
raise NotImplementedError(
"padding and shuffling data at the same time is not implemented")
# TODO(jamieas): TensorBoard warnings for all warnings below once available.
if num_threads > 1 and num_epochs is not None:
tf.compat.v1.logging.warn(
"enqueue_data was called with num_epochs and num_threads > 1. "
"num_epochs is applied per thread, so this will produce more "
"epochs than you probably intend. "
"If you want to limit epochs, use one thread.")
if shuffle and num_threads > 1 and num_epochs is not None:
tf.compat.v1.logging.warn(
"enqueue_data was called with shuffle=True, num_threads > 1, and "
"num_epochs. This will create multiple threads, all reading the "
"array/dataframe in order adding to the same shuffling queue; the "
"results will likely not be sufficiently shuffled.")
if not shuffle and num_threads > 1:
tf.compat.v1.logging.warn(
"enqueue_data was called with shuffle=False and num_threads > 1. "
"This will create multiple threads, all reading the "
"array/dataframe in order. If you want examples read in order, use"
" one thread; if you want multiple threads, enable shuffling.")
if shuffle:
min_after_dequeue = int(
capacity / 4 if min_after_dequeue is None else min_after_dequeue)
queue = tf.queue.RandomShuffleQueue(
capacity,
min_after_dequeue,
dtypes=types,
shapes=queue_shapes,
seed=seed)
elif pad_data:
min_after_dequeue = 0 # just for the summary text
queue_shapes = list(
map(lambda x: tuple(list(x[:-1]) + [None])
if len(x) > 0 else x, queue_shapes))
queue = tf.queue.PaddingFIFOQueue(
capacity, dtypes=types, shapes=queue_shapes)
else:
min_after_dequeue = 0 # just for the summary text
queue = tf.queue.FIFOQueue(capacity, dtypes=types, shapes=queue_shapes)
enqueue_ops = []
feed_fns = []
for i in range(num_threads):
# Note the placeholders have no shapes, so they will accept any
# enqueue_size. enqueue_many below will break them up.
placeholders = [tf.compat.v1.placeholder(t) for t in types]
enqueue_ops.append(queue.enqueue_many(placeholders))
seed_i = None if seed is None else (i + 1) * seed
if not pad_data:
feed_fns.append(
get_feed_fn(
placeholders,
data,
enqueue_size,
random_start=shuffle,
seed=seed_i,
num_epochs=num_epochs))
else:
feed_fns.append(
get_feed_fn(
placeholders,
data,
enqueue_size,
random_start=shuffle,
seed=seed_i,
num_epochs=num_epochs,
pad_value=pad_value))
runner = fqr._FeedingQueueRunner( # pylint: disable=protected-access
queue=queue,
enqueue_ops=enqueue_ops,
feed_fns=feed_fns)
tf.compat.v1.train.queue_runner.add_queue_runner(runner)
full = (
tf.cast(
tf.math.maximum(0,
queue.size() - min_after_dequeue),
tf.dtypes.float32) * (1. / (capacity - min_after_dequeue)))
# Note that name contains a '/' at the end so we intentionally do not place
# a '/' after %s below.
summary_name = (
"queue/%sfraction_over_%d_of_%d_full" %
(queue.name, min_after_dequeue, capacity - min_after_dequeue))
tf.compat.v1.summary.scalar(summary_name, full)
return queue