in tensorflow_fold/blocks/plan.py [0:0]
def init_loom(self, **loom_kwargs):
if not self.num_dequeuers:
if self.queue_capacity:
raise ValueError('cannot specify queue_capacity without also '
'specifying num_dequeuers')
return super(TrainPlan, self).init_loom(**loom_kwargs)
if self.ps_tasks < 1:
raise ValueError('must have at least one PS task; %s' % self.ps_tasks)
min_worker_replicas = self._num_queues() + self.num_dequeuers
if self.worker_replicas < min_worker_replicas:
raise ValueError(
'worker_replicas must be at least num_queues + num_dequeuers; '
'%s vs. %s + %s = %s' % (self.worker_replicas, self._num_queues(),
self.num_dequeuers, min_worker_replicas))
if self.compiler is None: raise ValueError('compiler is required')
if not self.batch_size: raise ValueError('batch_size is required')
# The queue needs to live on PS to be shared across workers. All
# workers create the same num_queues (this is min(ps_tasks,
# num_dequeuers)) distinct queues, with are assigned sequentially
# to the first num_queues PS tasks. At every training step, each
# enqueuing worker selects a queue with approx. minimal size to
# enqueue to (to balance data across queues). Similarly, each
# dequeuing worker selects a queue with approx. maximal size to
# dequeue from.
queues = []
for queue_id in xrange(self._num_queues()):
with tf.device(tf.DeviceSpec(job='ps', task=queue_id)):
queues.append(self._create_queue(queue_id))
if self.compute_summaries:
for q in queues:
self.metrics['queue_sizes/%s' % q.name] = q.size()
# First num_dequeuers tasks dequeue, the remainder enqueue.
if self.task < self.num_dequeuers:
self._setup_dequeuing(queues, **loom_kwargs)
return False, True # dequeuers need stats, but not examples
self._setup_enqueuing(queues, **loom_kwargs)
return True, False # enqueuers need examples, but not stats