client/securedrop_client/api_jobs/base.py (66 lines of code) (raw):
import logging
from typing import Any, TypeVar
from PyQt5.QtCore import QObject, pyqtSignal
from sqlalchemy.orm.session import Session
from securedrop_client.sdk import API, AuthError, RequestTimeoutError, ServerConnectionError
logger = logging.getLogger(__name__)
DEFAULT_NUM_ATTEMPTS = 5
QueueJobType = TypeVar("QueueJobType", bound="QueueJob")
class ApiInaccessibleError(Exception):
def __init__(self, message: str | None = None) -> None:
if not message:
message = (
"API is inaccessible either because there is no client or because the "
"client is not properly authenticated."
)
super().__init__(message)
class QueueJob(QObject):
def __init__(self, remaining_attempts: int = DEFAULT_NUM_ATTEMPTS) -> None:
super().__init__()
self.order_number: int | None = None
self.remaining_attempts = remaining_attempts
def __lt__(self, other: QueueJobType) -> bool:
"""
Python's PriorityQueue requires that QueueJobs are sortable as it
retrieves the next job using sorted(list(entries))[0].
For QueueJobs that have equal priority, we need to use the order_number key
to break ties to ensure that objects are retrieved in FIFO order.
"""
if self.order_number is None or other.order_number is None:
raise ValueError("cannot compare jobs without order_number!")
return self.order_number < other.order_number
class ClearQueueJob(QueueJob):
pass
class PauseQueueJob(QueueJob):
def __init__(self) -> None:
super().__init__()
class ApiJob(QueueJob):
"""
Signal that is emitted after an job finishes successfully.
"""
success_signal = pyqtSignal("PyQt_PyObject")
"""
Signal that is emitted if there is a failure during the job.
"""
failure_signal = pyqtSignal(Exception)
def __init__(self, remaining_attempts: int = DEFAULT_NUM_ATTEMPTS) -> None:
super().__init__(remaining_attempts)
def _do_call_api(self, api_client: API | None, session: Session) -> None:
if not api_client:
raise ApiInaccessibleError()
while self.remaining_attempts >= 1:
try:
self.remaining_attempts -= 1
result = self.call_api(api_client, session)
except (AuthError, ApiInaccessibleError) as e:
raise ApiInaccessibleError() from e
except (RequestTimeoutError, ServerConnectionError) as e:
if self.remaining_attempts == 0:
self.failure_signal.emit(e)
raise
# Timeout errors may mean the user should try changing Tor circuits
elif isinstance(e, RequestTimeoutError):
logger.info("Encountered RequestTimeoutError, retrying API call")
except Exception as e:
self.failure_signal.emit(e)
raise
else:
self.success_signal.emit(result)
break
def call_api(self, api_client: API, session: Session) -> Any:
"""
Method for making the actual API call and handling the result.
This MUST resturn a value if the API call and other tasks were successful and MUST raise
an exception if and only if the tasks failed. Presence of a raise exception indicates a
failure.
"""
raise NotImplementedError
class SingleObjectApiJob(ApiJob):
def __init__(self, uuid: str, remaining_attempts: int = DEFAULT_NUM_ATTEMPTS) -> None:
super().__init__(remaining_attempts)
"""
UUID of the item (source, reply, submission, etc.) that this item
corresponds to. We track this to prevent the addition of duplicate jobs.
"""
self.uuid = uuid
def __repr__(self) -> str:
return f"{self.__class__.__name__}('{self.uuid}', {self.remaining_attempts})"
def __eq__(self, other: object) -> bool:
# https://github.com/python/mypy/issues/2783
return self.uuid == getattr(other, "uuid", None) and type(self) is type(other)