in lingvo/base_runner.py [0:0]
def _RunLoop(self, job_name, loop_func, loop_args=(), cleanup_func=None):
"""Runs `loop_func`, retrying on expected errors.
Args:
job_name: string job name.
loop_func: callable to run and retry on expected errors.
loop_args: list or tuple of arguments to be passed to the loop_func.
cleanup_func: callable to run before retry.
"""
try:
tf.logging.info('%s started.', job_name)
loop_func(*loop_args)
tf.logging.info('%s done.', job_name)
if self._daemon:
# In daemon mode, an external scheduler will retry the job on 0 status.
# So exit with a non-zero status to prevent retry.
self._SetStatusMessage(
'%s completed successfully. Exiting with FAILURE to prevent retry.'
% job_name)
time.sleep(300) # Wait a bit for other threads to complete.
os._exit(4) # pylint: disable=protected-access
return
except Exception as e: # pylint:disable=broad-except
fatal_error_msgs = [
'Compilation failure',
'Run-time shape mismatch for TPUExecute argument'
]
if any([x in str(e) for x in fatal_error_msgs]):
# Fatal error if failing to compile graph on TPU.
retry = False
elif isinstance(e, tf.errors.AbortedError):
# AbortedError: is thrown when processes restarts.
retry = True
if self._InVizierStudy():
# With Vizier studies, we want to avoid retrying under some error
# conditions, these are captured here.
# Do not retry (via raise/retry) if AbortedError with RecvTensor
# message. This can happen if there are memory issues.
if ('The same RecvTensor (WorkerServiceImpl) request was received '
'twice' in str(e)):
retry = False
tf.logging.info('%s done (infeasible error).', job_name)
elif isinstance(e, tf.errors.OutOfRangeError):
# OutOfRangeError: Test/dev datasets are exhausted.
retry = self._cluster.do_eval
elif isinstance(e, tf.errors.InvalidArgumentError):
# InvalidArgumentError: variables were not initialized. Comes from
# ResourceVariableOp.
retry = True
# Do not retry within Vizier study when NaNs cause InvalidArgumentError.
if self._InVizierStudy():
if 'Tensor had NaN values' in str(e):
retry = False
tf.logging.info('%s done (infeasible result due to NaN values).',
job_name)
elif isinstance(
e, py_utils.transient_tf_errors +
(tf.errors.DataLossError, tf.errors.CancelledError)):
# Retry on these errors.
# FailedPreconditionError: variables are not initialized.
# DataLossError: Race condition between evaler and trainer when saving
# or removing checkpoints.
# CancelledError: Node was closed (on TPU).
retry = True
else:
retry = False
if FLAGS.pdb_on_exception and not retry:
pdb_wrapper.post_mortem()
retry = True
if retry:
# Retry indefinitely (error should be transient).
self._SetStatusMessage(
'%s exception: %s\n' % (job_name, e), retrying=True)
for msg in traceback.format_exc().split('\n'):
tf.logging.vlog(1, msg)
if cleanup_func:
cleanup_func()
if self._daemon:
# In daemon mode, retry will be handled by an external scheduler by
# returning a 0 status.
tf.logging.error('Execution stopped due to fatal error. '
'Returning 0 to be scheduled for retry.')
tf.logging.flush()
time.sleep(10)
os._exit(0) # pylint: disable=protected-access
raise
else:
# Allow the job to complete on errors that are unlikely to be transient,
# e.g. caused by a mis-configured model.
if self._should_report_metrics:
self._trial.ReportDone(
infeasible=True, infeasible_reason='Fatal error encountered.')
tf.logging.error('%s done (fatal error): %s', job_name, type(e))
self._SetStatusMessage('%s exception: %s\n' % (job_name, e))
# Prints the error message line by line to avoid message cropping.
msgv = traceback.format_exc().split('\n')
for msg in msgv:
tf.logging.error(msg)
# Check if we are potentially running within an experiment. If so,
# the worker should continue to the next trial instead of terminating
# the process.
if self._InVizierStudy():
return
# tf.logging.fatal prints out stack traces. Typically, that's not
# useful at all here. Here we want to exit the program
# definitively. Because LOG(QFATAL) is not easily available via
# python so far, we need a way to exit the program directly.
# Because sys.exit(1) must be called from the main thread, and does
# not cancel non-daemon threads anyway, we use os._exit instead.
tf.logging.flush()
time.sleep(10)
os._exit(1) # pylint: disable=protected-access