in petastorm/workers_pool/process_pool.py [0:0]
def _keep_retrying_while_zmq_again(timeout, func, allowed_failures=3):
"""Will keep executing func() as long as zmq.Again is being thrown.
Usage example:
>>> _keep_retrying_while_zmq_again(
>>> _KEEP_TRYING_WHILE_ZMQ_AGAIN_IS_RAIZED_TIMEOUT_S,
>>> lambda: self._ventilator_send.send_pyobj(
>>> (args, kargs),
>>> flags=zmq.constants.NOBLOCK))
:param timeout: A :class:`RuntimeError` is raised if could not execute ``func()`` without getting a
:class:`zmq.Again` within this timeout. The timeout is defined in seconds.
:param func: The function will be executed (as ``func()``)
:return: None
"""
now = time()
failures = 0
while time() < now + timeout:
try:
return func()
except zmq.Again:
logger.debug('zmq.Again exception caught. Will try again')
sleep(0.1)
continue
except ZMQBaseError as e:
# There are race conditions while setting up the zmq socket so you can get unexpected errors
# for the first bit of time. We therefore allow for a few unknown failures while the sockets
# are warming up. Before propogating them as a true problem.
sleep(0.1)
failures += 1
logger.debug('Unexpected ZMQ error \'%s\' received. Failures %d/%d', str(e), failures, allowed_failures)
if failures > allowed_failures:
raise
raise RuntimeError('Timeout ({} [sec]) has elapsed while keep getting \'zmq.Again\''.format(timeout))