jbi/retry.py (92 lines of code) (raw):

import asyncio import logging import sys from datetime import UTC, datetime, timedelta from os import getenv from time import sleep from dockerflow.logging import JsonLogFormatter, request_id_context import jbi.runner as runner from jbi.configuration import get_actions from jbi.errors import IgnoreInvalidRequestError from jbi.queue import get_dl_queue CONSTANT_RETRY = getenv("DL_QUEUE_CONSTANT_RETRY", "false") == "true" RETRY_TIMEOUT_DAYS = getenv("DL_QUEUE_RETRY_TIMEOUT_DAYS", 7) CONSTANT_RETRY_SLEEP = getenv("DL_QUEUE_CONSTANT_RETRY_SLEEP", 5) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) lsh = logging.StreamHandler(sys.stdout) lsh.setFormatter(JsonLogFormatter(logger_name=__name__)) logger.addHandler(lsh) ACTIONS = get_actions() async def retry_failed(item_executor=runner.execute_action, queue=get_dl_queue()): min_event_timestamp = datetime.now(UTC) - timedelta(days=int(RETRY_TIMEOUT_DAYS)) # load all bugs from DLQ bugs = await queue.retrieve() # metrics to track metrics = { "bug_count": len(bugs), "events_processed": 0, "events_skipped": 0, "events_failed": 0, "bugs_failed": 0, } for bug_id, items in bugs.items(): try: async for item in items: # skip and delete item if we have exceeded RETRY_TIMEOUT_DAYS if item.timestamp < min_event_timestamp: logger.warning("removing expired event %s", item.identifier) await queue.done(item) metrics["events_skipped"] += 1 continue # Put original request id in logging context for better tracking. if item.rid is not None: request_id_context.set(item.rid) logger.info( "retry event %s", item.identifier, extra={ "item": item.model_dump(), }, ) try: item_executor(item.payload, ACTIONS) await queue.done(item) metrics["events_processed"] += 1 except IgnoreInvalidRequestError: logger.warning("removing invalid event %s", item.identifier) await queue.done(item) metrics["events_processed"] += 1 except Exception: metrics["events_failed"] += 1 logger.exception( "failed to reprocess event %s.", item.identifier, extra={ "item": item.model_dump(), "bug": {"id": bug_id}, }, ) # check for other events that will be skipped pending_events = await queue.size(bug_id) if pending_events > 1: # if this isn't the only event for the bug logger.info( "skipping %d event(s) for bug %d, previous event %s failed", pending_events - 1, bug_id, item.identifier, ) metrics["events_skipped"] += pending_events - 1 break except Exception: metrics["bugs_failed"] += 1 logger.exception( "failed to parse events for bug %d.", bug_id, extra={"bug": {"id": bug_id}}, ) return metrics async def main(): while True: metrics = await retry_failed() logger.info("event queue processing complete", extra=metrics) if not CONSTANT_RETRY: return sleep(int(CONSTANT_RETRY_SLEEP)) if __name__ == "__main__": asyncio.run(main())