in jetstream/util.py [0:0]
def retry_get(session: Session, url: str, max_retries: int, user_agent: str | None = None) -> Any:
for _i in range(max_retries):
try:
if user_agent:
session.headers.update({"user-agent": user_agent})
blob = session.get(url).json()
break
except Exception as e:
print(e)
logger.info(f"Error fetching from {url}. Retrying...")
time.sleep(1)
else:
exception = RetryLimitExceededException(f"Too many retries for {url}")
logger.exception(exception.__str__(), exc_info=exception)
raise exception
return blob