in ultravox/utils/monkey_patches.py [0:0]
def patch_with_retry(cls: Type[Any], method_name: str, max_attempts: int = 10) -> None:
"""
Generic function to patch any method with retry capability.
Args:
cls: The class containing the method to patch
method_name: The name of the method to patch
max_attempts: Maximum number of retry attempts (default: 10)
"""
original_method = getattr(cls, method_name)
@tenacity.retry(
stop=tenacity.stop_after_attempt(max_attempts),
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
retry=tenacity.retry_if_exception_type(Exception),
before_sleep=tenacity.before_sleep_log(logger, logging.INFO),
reraise=True,
)
@wraps(original_method)
def method_with_retry(self, *args, **kwargs):
return original_method(self, *args, **kwargs)
# Apply the patch
setattr(cls, method_name, method_with_retry)
logger.info(
f"Applied retry patch to {cls.__name__}.{method_name} with max_attempts={max_attempts}"
)