jbi/queue.py (240 lines of code) (raw):

"""This `queue` module stores Bugzilla webhook messages that we failed to sync to Jira. As Bugzilla sends us webhook messages, we want to eagerly accept them and return a `200` response so that we don't prevent it from sending new messages. But if we fail to sync a bug, we want to keep the message so we can retry it later. We also want to store any messages that might be successfuly synced, but were preceded by a message that wasn't synced. Classes: - QueueItem: An entry in the dead letter queue, containing information about the payload, timestamp, and any associated errors when attempting to sync the bug. - PythonException: Information about any exception that occured when syncing a bug, stored along with the item. - DeadLetterQueue: Class representing the dead letter queue system, providing methods for adding, retrieving, and managing queue items. Supports pluggable backends. - QueueBackend: Abstract base class defining the interface for a DeadLetterQueue backend. - FileBackend: Implementation of a QueueBackend that stores messages in files. - InvalidQueueDSNError: Exception raised when an invalid queue DSN is provided. - QueueItemRetrievalError: Exception raised when the queue is unable to retreive a failed item and parse it as an item """ import logging import re import tempfile import traceback from abc import ABC, abstractmethod from datetime import datetime from functools import cached_property, lru_cache from json import JSONDecodeError from pathlib import Path from typing import AsyncIterator, Optional from urllib.parse import ParseResult, urlparse import dockerflow.checks from pydantic import BaseModel, FileUrl, ValidationError, computed_field from jbi.bugzilla import models as bugzilla_models from jbi.environment import get_settings logger = logging.getLogger(__name__) ITEM_ID_PATTERN = re.compile( r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\+\d{2}:\d{2})-(?P<bug_id>\d+)-(?P<action>\w*)-(?P<status>error|postponed)" ) def extract_bug_id_from_item_id(item_id: str) -> str: if match := re.search(ITEM_ID_PATTERN, item_id): return match.group("bug_id") raise ValueError( "item_id %s did not match expected format: %s", item_id, ITEM_ID_PATTERN.pattern ) class QueueItemRetrievalError(Exception): def __init__(self, message=None, path=None): self.message = message or "Error reading or parsing queue item" self.path = path def __str__(self): return f"QueueItemRetrievalError: {self.message} - path: {self.path}." class InvalidQueueDSNError(Exception): pass class PythonException(BaseModel, frozen=True): type: str description: str details: str @classmethod def from_exc(cls, exc: Exception): return PythonException( type=exc.__class__.__name__, description=str(exc), details="".join(traceback.format_exception(exc)), ) class QueueItem(BaseModel, frozen=True): """Dead Letter Queue entry.""" payload: bugzilla_models.WebhookRequest error: Optional[PythonException] = None rid: Optional[str] = None @computed_field # type: ignore @cached_property def version(self) -> str: # Prevents circular imports. from jbi import app return app.VERSION @property def timestamp(self) -> datetime: return self.payload.event.time @computed_field # type: ignore @property def identifier(self) -> str: return f"{self.payload.event.time}-{self.payload.bug.id}-{self.payload.event.action}-{'error' if self.error else 'postponed'}" @lru_cache(maxsize=1) def get_dl_queue(): settings = get_settings() return DeadLetterQueue(settings.dl_queue_dsn) class QueueBackend(ABC): """An interface for dead letter queues.""" @abstractmethod def ping(self) -> bool: """Report if the queue backend is available and ready to be written to""" pass @abstractmethod async def clear(self) -> None: """Remove all bugs and their items from the queue""" pass @abstractmethod async def put(self, item: QueueItem) -> None: """Insert item into queued items for a bug, maintaining sorted order by payload event time ascending """ pass @abstractmethod async def remove(self, bug_id: int, identifier: str) -> None: """Remove an item from the target bug's queue. If the item is the last one for the bug, remove the bug from the queue entirely. """ pass @abstractmethod def get(self, bug_id: int) -> AsyncIterator[QueueItem]: """Retrieve all of the queue items for a specific bug, sorted in ascending order by the timestamp of the payload event. """ pass @abstractmethod async def exists(self, item_id: str) -> bool: """ Report whether an item with id `item_id` exists in the queue """ pass @abstractmethod async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]: """Retrieve all items in the queue, grouped by bug Returns: dict[int, List[QueueItem]]: Returns a dict of {bug_id: list of events}. Each list of events sorted in ascending order by the timestamp of the payload event. """ pass @abstractmethod async def size(self, bug_id: Optional[int] = None) -> int: """Report the number of items in the queue, optionally filtered by bug id""" pass class FileBackend(QueueBackend): def __init__(self, location): self.location = Path(location) self.location.mkdir(parents=True, exist_ok=True) def __repr__(self) -> str: return f"FileBackend({self.location})" def ping(self): try: with tempfile.TemporaryDirectory(dir=self.location) as temp_dir: with tempfile.TemporaryFile(dir=temp_dir) as f: f.write(b"") return True except Exception: logger.exception("Could not write to file backed queue") return False async def clear(self): for root, dirs, files in self.location.walk(top_down=False): for name in files: (root / name).unlink() for name in dirs: (root / name).rmdir() async def put(self, item: QueueItem): folder = self.location / f"{item.payload.bug.id}" folder.mkdir(exist_ok=True) path = folder / (item.identifier + ".json") path.write_text(item.model_dump_json()) logger.debug( "Wrote item %s for bug %s to path %s", item.identifier, item.payload.bug.id, path, ) logger.debug("%d items in dead letter queue", await self.size()) async def remove(self, bug_id: int, identifier: str): bug_dir = self.location / f"{bug_id}" item_path = bug_dir / (identifier + ".json") try: logger.debug("Removing %s from queue for bug %s", identifier, bug_id) item_path.unlink() except FileNotFoundError as exc: logger.warning( "Could not delete missing item at path %s", str(item_path), exc ) if not any(bug_dir.iterdir()): bug_dir.rmdir() logger.debug("Removed directory for bug %s", bug_id) async def exists(self, item_id: str) -> bool: try: bug_id = extract_bug_id_from_item_id(item_id) except ValueError as e: logger.warning( "provided item_id %s did not match expected format", item_id, exc_info=e ) return False item_path = (self.location / bug_id / item_id).with_suffix(".json") # even though pathlib.Path.exists() returns a bool, mypy doesn't seem to get it return bool(item_path.exists()) async def get(self, bug_id: int) -> AsyncIterator[QueueItem]: folder = self.location / str(bug_id) if not folder.is_dir(): return yield for path in sorted(folder.iterdir()): try: yield QueueItem.parse_file(path) except (JSONDecodeError, ValidationError) as e: raise QueueItemRetrievalError( "Unable to load item from queue", path=path ) from e async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]: all_items: dict[int, AsyncIterator[QueueItem]] = {} for filesystem_object in self.location.iterdir(): if filesystem_object.is_dir() and re.match( r"\d", filesystem_object.name ): # filtering out temp files from checks all_items[int(filesystem_object.name)] = self.get(filesystem_object) return all_items async def size(self, bug_id=None) -> int: location = self.location / str(bug_id) if bug_id else self.location return sum(1 for _ in location.rglob("*.json")) class DeadLetterQueue: backend: QueueBackend def __init__(self, dsn: FileUrl | str | ParseResult): dsn = urlparse(url=dsn) if isinstance(dsn, str) else dsn if dsn.scheme != "file": raise InvalidQueueDSNError(f"{dsn.scheme} is not supported") self.backend = FileBackend(dsn.path) def check_writable(self) -> list[dockerflow.checks.CheckMessage]: """Heartbeat check to assert we can write items to queue""" results = [] ping_result = self.backend.ping() if ping_result is False: results.append( dockerflow.checks.Error( f"queue with {str(self.backend)} unavailable", hint="with FileBackend, check that folder is writable", id="queue.backend.ping", ) ) return results async def check_readable(self) -> list[dockerflow.checks.CheckMessage]: results = [] try: bugs = await self.retrieve() for bug_id, items in bugs.items(): try: bug_items = (await self.retrieve()).values() [[i async for i in items] for items in bug_items] except QueueItemRetrievalError as exc: results.append( dockerflow.checks.Error( f"failed to parse file {str(exc.path)}", hint="check that parked event files are not corrupt", id="queue.backend.read", ) ) except Exception as exc: logger.exception(exc) results.append( dockerflow.checks.Error( f"queue with {str(self.backend)} cannot be retrieved", hint=f"invalid data: {exc}", id="queue.backend.retrieve", ) ) return results async def postpone(self, payload: bugzilla_models.WebhookRequest, rid: str) -> None: """ Postpone the specified request for later. """ item = QueueItem(payload=payload, rid=rid) await self.backend.put(item) async def track_failed( self, payload: bugzilla_models.WebhookRequest, exc: Exception, rid: str ) -> QueueItem: """ Store the specified payload and exception information into the queue. """ item = QueueItem( payload=payload, error=PythonException.from_exc(exc), rid=rid, ) await self.backend.put(item) return item async def is_blocked(self, payload: bugzilla_models.WebhookRequest) -> bool: """ Return `True` if the specified `payload` is blocked and should be queued instead of being processed. """ existing = await self.backend.size(payload.bug.id) return existing > 0 async def retrieve(self) -> dict[int, AsyncIterator[QueueItem]]: """ Returns the whole queue -- a dict of bug_id and a generator for the items for that bug """ return await self.backend.get_all() async def size(self, bug_id=None): return await self.backend.size(bug_id=bug_id) async def done(self, item: QueueItem) -> None: """ Mark item as done, remove from queue. """ return await self.backend.remove(item.payload.bug.id, item.identifier) async def exists(self, item_id) -> bool: """ Report whether an item with id `item_id` exists in the queue """ return await self.backend.exists(item_id) async def delete(self, item_id) -> None: """ Remove an item from the queue by item_id """ bug_id = extract_bug_id_from_item_id(item_id) await self.backend.remove(bug_id=int(bug_id), identifier=item_id)