def _keep_retrying_while_zmq_again()

in petastorm/workers_pool/process_pool.py [0:0]


def _keep_retrying_while_zmq_again(timeout, func, allowed_failures=3):
    """Will keep executing func() as long as zmq.Again is being thrown.

    Usage example:

    >>> _keep_retrying_while_zmq_again(
    >>>   _KEEP_TRYING_WHILE_ZMQ_AGAIN_IS_RAIZED_TIMEOUT_S,
    >>>   lambda: self._ventilator_send.send_pyobj(
    >>>      (args, kargs),
    >>>      flags=zmq.constants.NOBLOCK))

    :param timeout: A :class:`RuntimeError` is raised if could not execute ``func()`` without getting a
        :class:`zmq.Again` within this timeout. The timeout is defined in seconds.
    :param func: The function will be executed (as ``func()``)
    :return: None
    """
    now = time()
    failures = 0
    while time() < now + timeout:
        try:
            return func()
        except zmq.Again:
            logger.debug('zmq.Again exception caught. Will try again')
            sleep(0.1)
            continue
        except ZMQBaseError as e:
            # There are race conditions while setting up the zmq socket so you can get unexpected errors
            # for the first bit of time. We therefore allow for a few unknown failures while the sockets
            # are warming up. Before propogating them as a true problem.
            sleep(0.1)
            failures += 1
            logger.debug('Unexpected ZMQ error \'%s\' received. Failures %d/%d', str(e), failures, allowed_failures)
            if failures > allowed_failures:
                raise
    raise RuntimeError('Timeout ({} [sec]) has elapsed while keep getting \'zmq.Again\''.format(timeout))