client/securedrop_client/queue.py (222 lines of code) (raw):
import itertools
import logging
import threading
from queue import PriorityQueue
from typing import Any
from PyQt5.QtCore import QObject, QThread, pyqtBoundSignal, pyqtSignal, pyqtSlot
from sqlalchemy.orm import scoped_session
from securedrop_client.api_jobs.base import (
DEFAULT_NUM_ATTEMPTS,
ApiInaccessibleError,
ApiJob,
ClearQueueJob,
PauseQueueJob,
QueueJob,
)
from securedrop_client.api_jobs.downloads import (
FileDownloadJob,
MessageDownloadJob,
ReplyDownloadJob,
)
from securedrop_client.api_jobs.seen import SeenJob
from securedrop_client.api_jobs.sources import DeleteConversationJob, DeleteSourceJob
from securedrop_client.api_jobs.updatestar import UpdateStarJob
from securedrop_client.api_jobs.uploads import SendReplyJob
from securedrop_client.sdk import API, RequestTimeoutError, ServerConnectionError
logger = logging.getLogger(__name__)
class RunnablePriorityQueue(PriorityQueue):
"""
Wrapper class around PriorityQueue that emits a signal when message or reply
download jobs are enqueued or dequeued.
"""
def __init__(
self, *args: Any, queue_updated_signal: pyqtBoundSignal | None = None, **kwargs: Any
):
self.queue_updated_signal = queue_updated_signal
super().__init__(*args, **kwargs)
def get(self, *args: Any, **kwargs: Any) -> tuple[int, QueueJob]:
item = super().get(*args, **kwargs)
if self.queue_updated_signal:
self.queue_updated_signal.emit(self._get_num_message_or_reply_download_jobs())
return item
def put(self, *args: Any, **kwargs: Any) -> None:
item = super().put(*args, **kwargs)
if self.queue_updated_signal:
self.queue_updated_signal.emit(self._get_num_message_or_reply_download_jobs())
return item
def _get_num_message_or_reply_download_jobs(self) -> int:
message_and_reply_download_jobs = list(
# PriorityQueue items are a tuple of (priority, job)
filter(lambda job: type(job[1]) in (MessageDownloadJob, ReplyDownloadJob), self.queue)
)
return len(message_and_reply_download_jobs)
class RunnableQueue(QObject):
"""
RunnableQueue maintains a priority queue and processes jobs in that queue. It continuously
processes the next job in the queue, which is ordered by highest priority. Priority is based on
job type. If multiple jobs of the same type are added to the queue then they are retrieved
in FIFO order.
If a RequestTimeoutError or ServerConnectionError is encountered while processing a job, the
job will be added back to the queue, the processing loop will stop, and the paused signal will
be emitted. New jobs can still be added, but the processing function will need to be called
again in order to resume. The processing loop is resumed when the resume signal is emitted.
If an ApiInaccessibleError is encountered while processing a job, api_client will be set to
None and the processing loop will stop. If the queue is resumed before the queue manager
stops the queue thread, api_client will still be None and the next job will raise an
ApiInaccessibleError before it makes an api call, which will repeat this process.
Any other exception encountered while processing a job is unexpected, so the queue will drop the
job and continue on to processing the next job. The job itself is responsible for emitting the
success and failure signals, so when an unexpected error occurs, it should emit the failure
signal so that the Controller can respond accordingly.
"""
# These are the priorities for processing jobs. Lower numbers corresponds to a higher priority.
JOB_PRIORITIES = {
ClearQueueJob: 0, # Must preempt all other jobs
PauseQueueJob: 11,
FileDownloadJob: 13, # File downloads processed in separate queue
DeleteSourceJob: 14,
DeleteConversationJob: 14,
SendReplyJob: 15,
UpdateStarJob: 16,
MessageDownloadJob: 17,
ReplyDownloadJob: 17,
SeenJob: 18,
}
# Signal that is emitted when processing is stopped and queued jobs are cleared
cleared = pyqtSignal()
# Signal that is emitted when processing is paused
paused = pyqtSignal()
# Signal that is emitted to resume processing jobs
resume = pyqtSignal()
def __init__(
self,
api_client: API | None,
session_maker: scoped_session,
queue_updated_signal: pyqtBoundSignal | None = None,
) -> None:
super().__init__()
self.api_client = api_client
self.session_maker = session_maker
self.queue = RunnablePriorityQueue(queue_updated_signal=queue_updated_signal) # type: PriorityQueue[tuple[int, QueueJob]]
# `order_number` ensures jobs with equal priority are retrieved in FIFO order. This is
# needed because PriorityQueue is implemented using heapq which does not have sort
# stability. For more info, see : https://bugs.python.org/issue17794
self.order_number = itertools.count()
self.current_job: QueueJob | None = None
# Hold when reading/writing self.current_job or mutating queue state
self.condition_add_or_remove_job = threading.Condition()
self.resume.connect(self.process)
def _check_for_duplicate_jobs(self, job: QueueJob) -> bool:
"""
Queued jobs are stored on self.queue.queue. The currently executing job is
stored on self.current_job. We check that the job to be added is not among them.
"""
in_progress_jobs = [in_progress_job for priority, in_progress_job in self.queue.queue]
if self.current_job is not None:
in_progress_jobs.append(self.current_job)
if job in in_progress_jobs:
logger.debug(f"Duplicate job {job}, skipping")
return True
return False
def _clear(self) -> None:
"""
Reinstantiate the PriorityQueue, rather than trying to clear it via undocumented methods.[1]
[1]: https://stackoverflow.com/a/38560911
"""
with self.condition_add_or_remove_job:
self.queue = PriorityQueue()
self.cleared.emit()
def add_job(self, job: QueueJob) -> None:
"""
Add the job with its priority to the queue after assigning it the next order_number.
Can block while waiting to acquire condition_add_or_remove_job.
"""
with self.condition_add_or_remove_job:
if self._check_for_duplicate_jobs(job):
return
logger.debug(f"Added {job} to queue")
current_order_number = next(self.order_number)
job.order_number = current_order_number
priority = self.JOB_PRIORITIES[type(job)]
self.queue.put_nowait((priority, job))
self.condition_add_or_remove_job.notify()
def _re_add_job(self, job: QueueJob) -> None:
"""
Reset the job's remaining attempts and put it back into the queue in the order in which it
was submitted by the user (do not assign it the next order_number). Used internally.
When called condition_add_or_remove_job should be held.
"""
if self._check_for_duplicate_jobs(job):
return
logger.debug(f"Added {job} to queue")
job.remaining_attempts = DEFAULT_NUM_ATTEMPTS
priority = self.JOB_PRIORITIES[type(job)]
self.queue.put_nowait((priority, job))
self.condition_add_or_remove_job.notify()
@pyqtSlot()
def process(self) -> None:
"""
Process the next job in the queue.
If the job is a ClearQueueJob, call _clear() and return from the processing loop so that
the processing thread can quit.
If the job is a PauseQueueJob, emit the paused signal and return from the processing loop so
that no more jobs are processed until the queue resumes.
If the job raises RequestTimeoutError or ServerConnectionError, then:
(1) Add a PauseQueuejob to the queue
(2) Add the job back to the queue so that it can be reprocessed once the queue is resumed.
If the job raises ApiInaccessibleError, then:
(1) Set the token to None so that the queue manager will stop enqueuing jobs since we are
no longer able to make api requests.
(2) Return from the processing loop since a valid token will be needed in order to process
jobs.
Note: Generic exceptions are handled in _do_call_api.
"""
while True:
with self.condition_add_or_remove_job:
self.condition_add_or_remove_job.wait_for(lambda: not self.queue.empty())
priority, self.current_job = self.queue.get(block=False)
if isinstance(self.current_job, ClearQueueJob):
with self.condition_add_or_remove_job:
self.current_job = None
self._clear()
return
if isinstance(self.current_job, PauseQueueJob):
self.paused.emit()
with self.condition_add_or_remove_job:
self.current_job = None
return
try:
if isinstance(self.current_job, ApiJob):
session = self.session_maker()
self.current_job._do_call_api(self.api_client, session)
except ApiInaccessibleError as e:
logger.debug(f"{type(e).__name__}: {e}")
self.api_client = None
with self.condition_add_or_remove_job:
self.current_job = None
return
except (RequestTimeoutError, ServerConnectionError) as e:
logger.debug(f"{type(e).__name__}: {e}")
self.add_job(PauseQueueJob())
with self.condition_add_or_remove_job:
job, self.current_job = self.current_job, None
self._re_add_job(job)
except Exception as e:
logger.error("Skipping job")
logger.debug(f"Skipping job: {type(e).__name__}: {e}")
finally:
with self.condition_add_or_remove_job:
self.current_job = None
session.close()
class ApiJobQueue(QObject):
"""
ApiJobQueue is the queue manager of two FIFO priority queues that process jobs of type ApiJob.
The queue manager starts the queues when a new auth token is provided to ensure jobs are able to
make their requests. It stops the queues whenever a MetadataSyncJob, which runs in a continuous
loop outside of the queue manager, encounters an ApiInaccessibleError and forces a logout
from the Controller.
"""
# Signal that is emitted after a queue is cleared.
cleared = pyqtSignal()
# Signal that is emitted after a queue is paused.
paused = pyqtSignal()
# Signal emitted when an item is added or removed from the main queue
main_queue_updated = pyqtSignal(int)
def __init__(
self,
api_client: API | None,
session_maker: scoped_session,
main_thread: QThread,
download_file_thread: QThread,
) -> None:
super().__init__(None)
self.main_thread = main_thread
self.download_file_thread = download_file_thread
self.main_queue = RunnableQueue(
api_client, session_maker, queue_updated_signal=self.main_queue_updated
)
self.download_file_queue = RunnableQueue(api_client, session_maker)
self.main_queue.moveToThread(self.main_thread)
self.download_file_queue.moveToThread(self.download_file_thread)
self.main_thread.started.connect(self.main_queue.process)
self.download_file_thread.started.connect(self.download_file_queue.process)
self.main_queue.paused.connect(self.on_main_queue_paused)
self.download_file_queue.paused.connect(self.on_file_download_queue_paused)
self.main_queue.cleared.connect(self.on_main_queue_cleared)
self.download_file_queue.cleared.connect(self.on_file_download_queue_cleared)
def start(self, api_client: API) -> None:
"""
Start the queues whenever a new api token is provided.
"""
self.main_queue.api_client = api_client
self.download_file_queue.api_client = api_client
if not self.main_thread.isRunning():
self.main_thread.start()
logger.debug("Started main queue")
if not self.download_file_thread.isRunning():
self.download_file_thread.start()
logger.debug("Started file download queue")
def stop(self) -> None:
"""
Inject a ClearQueueJob into each queue and quit its processing thread. To keep this
method non-blocking, we do NOT wait() for the thread to return, which will happen only
when RunnableQueue.process() reaches the ClearQueueJob and returns from its loop.
"""
if self.main_thread.isRunning():
self.main_queue.add_job(ClearQueueJob())
self.main_thread.quit()
logger.debug("Asked main queue thread to quit")
if self.download_file_thread.isRunning():
self.download_file_queue.add_job(ClearQueueJob())
self.download_file_thread.quit()
logger.debug("Asked file-download queue thread to quit")
@pyqtSlot()
def on_main_queue_paused(self) -> None:
"""
Emit the paused signal if the main queue has been paused.
"""
logger.debug("Paused main queue")
self.paused.emit()
@pyqtSlot()
def on_file_download_queue_paused(self) -> None:
"""
Emit the paused signal if the file download queue has been paused.
"""
logger.debug("Paused file download queue")
self.paused.emit()
@pyqtSlot()
def on_main_queue_cleared(self) -> None:
"""
Emit the "cleared" signal when the main RunnableQueue is cleared.
"""
logger.debug("Cleared main queue")
self.cleared.emit()
@pyqtSlot()
def on_file_download_queue_cleared(self) -> None:
"""
Emit the "cleared" signal when the file-download RunnableQueue is cleared.
"""
logger.debug("Cleared file download queue")
self.cleared.emit()
def resume_queues(self) -> None:
"""
Emit the resume signal to the queues if they are running.
"""
if self.main_thread.isRunning():
logger.debug("Resuming main queue")
self.main_queue.resume.emit()
if self.download_file_thread.isRunning():
logger.debug("Resuming download queue")
self.download_file_queue.resume.emit()
@pyqtSlot(object)
def enqueue(self, job: ApiJob) -> None:
"""
Enqueue the supplied job if the queues are running.
"""
if not self.main_thread.isRunning() or not self.download_file_thread.isRunning():
logger.debug("Not adding job before queues have been started.")
return
if isinstance(job, FileDownloadJob):
self.download_file_queue.add_job(job)
else:
self.main_queue.add_job(job)