easy_rec/python/utils/estimator_utils.py [91:143]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  def begin(self):
    """Count the number of workers and masters, and setup barrier queue."""
    tf.logging.info('number workers(including master) = %d' % self._num_worker)
    with tf.device(
        tf.DeviceSpec(job='ps', task=0, device_type='CPU', device_index=0)):
      self._queue = tf.FIFOQueue(
          capacity=self._num_worker,
          dtypes=[tf.float32],
          shapes=[()],
          name='exit_counter',
          shared_name='exit_counter')
      self._signal_que = tf.FIFOQueue(
          capacity=self._num_worker,
          dtypes=[tf.string],
          shapes=[()],
          name='exit_counter_signal',
          shared_name='exit_counter_signal')
    self._enque = self._queue.enqueue(1.0)
    self._que_size = self._queue.size()
    self._deque = self._queue.dequeue()
    if self._is_chief:
      self._flag_file = os.path.join(self._model_dir,
                                     'atexit_sync_' + str(int(time.time())))
      self._send = self._signal_que.enqueue([self._flag_file])
    else:
      self._recv = self._signal_que.dequeue()
      self._flag_file = None

  def after_create_session(self, session, coord):
    """Clean up the queue after create session.

    Sometimes ps is not exit, the last run enqueued elements will remain in the queue
    """
    if self._is_chief:
      # clear the queue
      que_size = session.run(self._que_size)
      while que_size > 0:
        session.run(self._deque)
        que_size = session.run(self._que_size)
      logging.info('exit counter cleared: %d' % que_size)

  def end(self, session):
    """Ensure when all workers and master enqueue an element, then exit."""
    session.run(self._enque)
    que_size = session.run(self._que_size)
    while que_size < self._num_worker:
      que_size = session.run(self._que_size)
      time.sleep(5)
      tf.logging.info(
          'waiting for other worker to exit, finished %d, total %d' %
          (que_size, self._num_worker))
    # prepare on_exit synchronize base on self._flag_file
    if self._is_chief:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



easy_rec/python/utils/estimator_utils.py [187:239]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  def begin(self):
    """Count the number of workers and masters, and setup barrier queue."""
    tf.logging.info('number workers(including master) = %d' % self._num_worker)
    with tf.device(
        tf.DeviceSpec(job='ps', task=0, device_type='CPU', device_index=0)):
      self._queue = tf.FIFOQueue(
          capacity=self._num_worker,
          dtypes=[tf.float32],
          shapes=[()],
          name='exit_counter',
          shared_name='exit_counter')
      self._signal_que = tf.FIFOQueue(
          capacity=self._num_worker,
          dtypes=[tf.string],
          shapes=[()],
          name='exit_counter_signal',
          shared_name='exit_counter_signal')
    self._enque = self._queue.enqueue(1.0)
    self._que_size = self._queue.size()
    self._deque = self._queue.dequeue()
    if self._is_chief:
      self._flag_file = os.path.join(self._model_dir,
                                     'atexit_sync_' + str(int(time.time())))
      self._send = self._signal_que.enqueue([self._flag_file])
    else:
      self._recv = self._signal_que.dequeue()
      self._flag_file = None

  def after_create_session(self, session, coord):
    """Clean up the queue after create session.

    Sometimes ps is not exit, the last run enqueued elements will remain in the queue
    """
    if self._is_chief:
      # clear the queue
      que_size = session.run(self._que_size)
      while que_size > 0:
        session.run(self._deque)
        que_size = session.run(self._que_size)
      logging.info('exit counter cleared: %d' % que_size)

  def end(self, session):
    """Ensure when all workers and master enqueue an element, then exit."""
    session.run(self._enque)
    que_size = session.run(self._que_size)
    while que_size < self._num_worker:
      que_size = session.run(self._que_size)
      time.sleep(5)
      tf.logging.info(
          'waiting for other worker to exit, finished %d, total %d' %
          (que_size, self._num_worker))
    # prepare on_exit synchronize base on self._flag_file
    if self._is_chief:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



