def _RunLoop()

in lingvo/base_runner.py [0:0]


  def _RunLoop(self, job_name, loop_func, loop_args=(), cleanup_func=None):
    """Runs `loop_func`, retrying on expected errors.

    Args:
      job_name: string job name.
      loop_func: callable to run and retry on expected errors.
      loop_args: list or tuple of arguments to be passed to the loop_func.
      cleanup_func: callable to run before retry.
    """
    try:
      tf.logging.info('%s started.', job_name)
      loop_func(*loop_args)
      tf.logging.info('%s done.', job_name)

      if self._daemon:
        # In daemon mode, an external scheduler will retry the job on 0 status.
        # So exit with a non-zero status to prevent retry.
        self._SetStatusMessage(
            '%s completed successfully. Exiting with FAILURE to prevent retry.'
            % job_name)
        time.sleep(300)  # Wait a bit for other threads to complete.
        os._exit(4)  # pylint: disable=protected-access

      return
    except Exception as e:  # pylint:disable=broad-except
      fatal_error_msgs = [
          'Compilation failure',
          'Run-time shape mismatch for TPUExecute argument'
      ]
      if any([x in str(e) for x in fatal_error_msgs]):
        # Fatal error if failing to compile graph on TPU.
        retry = False
      elif isinstance(e, tf.errors.AbortedError):
        # AbortedError: is thrown when processes restarts.
        retry = True
        if self._InVizierStudy():
          # With Vizier studies, we want to avoid retrying under some error
          # conditions, these are captured here.
          # Do not retry (via raise/retry) if AbortedError with RecvTensor
          # message. This can happen if there are memory issues.
          if ('The same RecvTensor (WorkerServiceImpl) request was received '
              'twice' in str(e)):
            retry = False
            tf.logging.info('%s done (infeasible error).', job_name)
      elif isinstance(e, tf.errors.OutOfRangeError):
        #   OutOfRangeError: Test/dev datasets are exhausted.
        retry = self._cluster.do_eval
      elif isinstance(e, tf.errors.InvalidArgumentError):
        #   InvalidArgumentError: variables were not initialized. Comes from
        #       ResourceVariableOp.
        retry = True
        # Do not retry within Vizier study when NaNs cause InvalidArgumentError.
        if self._InVizierStudy():
          if 'Tensor had NaN values' in str(e):
            retry = False
            tf.logging.info('%s done (infeasible result due to NaN values).',
                            job_name)
      elif isinstance(
          e, py_utils.transient_tf_errors +
          (tf.errors.DataLossError, tf.errors.CancelledError)):
        # Retry on these errors.
        #   FailedPreconditionError: variables are not initialized.
        #   DataLossError: Race condition between evaler and trainer when saving
        #       or removing checkpoints.
        #   CancelledError: Node was closed (on TPU).
        retry = True
      else:
        retry = False

      if FLAGS.pdb_on_exception and not retry:
        pdb_wrapper.post_mortem()
        retry = True

      if retry:
        # Retry indefinitely (error should be transient).
        self._SetStatusMessage(
            '%s exception: %s\n' % (job_name, e), retrying=True)

        for msg in traceback.format_exc().split('\n'):
          tf.logging.vlog(1, msg)

        if cleanup_func:
          cleanup_func()

        if self._daemon:
          # In daemon mode, retry will be handled by an external scheduler by
          # returning a 0 status.
          tf.logging.error('Execution stopped due to fatal error. '
                           'Returning 0 to be scheduled for retry.')
          tf.logging.flush()
          time.sleep(10)
          os._exit(0)  # pylint: disable=protected-access

        raise
      else:
        # Allow the job to complete on errors that are unlikely to be transient,
        # e.g. caused by a mis-configured model.
        if self._should_report_metrics:
          self._trial.ReportDone(
              infeasible=True, infeasible_reason='Fatal error encountered.')
        tf.logging.error('%s done (fatal error): %s', job_name, type(e))

        self._SetStatusMessage('%s exception: %s\n' % (job_name, e))

        # Prints the error message line by line to avoid message cropping.
        msgv = traceback.format_exc().split('\n')
        for msg in msgv:
          tf.logging.error(msg)

        # Check if we are potentially running within an experiment. If so,
        # the worker should continue to the next trial instead of terminating
        # the process.
        if self._InVizierStudy():
          return

        # tf.logging.fatal prints out stack traces. Typically, that's not
        # useful at all here. Here we want to exit the program
        # definitively. Because LOG(QFATAL) is not easily available via
        # python so far, we need a way to exit the program directly.
        # Because sys.exit(1) must be called from the main thread, and does
        # not cancel non-daemon threads anyway, we use os._exit instead.
        tf.logging.flush()
        time.sleep(10)
        os._exit(1)  # pylint: disable=protected-access