client/securedrop_client/queue.py (222 lines of code) (raw):

import itertools import logging import threading from queue import PriorityQueue from typing import Any from PyQt5.QtCore import QObject, QThread, pyqtBoundSignal, pyqtSignal, pyqtSlot from sqlalchemy.orm import scoped_session from securedrop_client.api_jobs.base import ( DEFAULT_NUM_ATTEMPTS, ApiInaccessibleError, ApiJob, ClearQueueJob, PauseQueueJob, QueueJob, ) from securedrop_client.api_jobs.downloads import ( FileDownloadJob, MessageDownloadJob, ReplyDownloadJob, ) from securedrop_client.api_jobs.seen import SeenJob from securedrop_client.api_jobs.sources import DeleteConversationJob, DeleteSourceJob from securedrop_client.api_jobs.updatestar import UpdateStarJob from securedrop_client.api_jobs.uploads import SendReplyJob from securedrop_client.sdk import API, RequestTimeoutError, ServerConnectionError logger = logging.getLogger(__name__) class RunnablePriorityQueue(PriorityQueue): """ Wrapper class around PriorityQueue that emits a signal when message or reply download jobs are enqueued or dequeued. """ def __init__( self, *args: Any, queue_updated_signal: pyqtBoundSignal | None = None, **kwargs: Any ): self.queue_updated_signal = queue_updated_signal super().__init__(*args, **kwargs) def get(self, *args: Any, **kwargs: Any) -> tuple[int, QueueJob]: item = super().get(*args, **kwargs) if self.queue_updated_signal: self.queue_updated_signal.emit(self._get_num_message_or_reply_download_jobs()) return item def put(self, *args: Any, **kwargs: Any) -> None: item = super().put(*args, **kwargs) if self.queue_updated_signal: self.queue_updated_signal.emit(self._get_num_message_or_reply_download_jobs()) return item def _get_num_message_or_reply_download_jobs(self) -> int: message_and_reply_download_jobs = list( # PriorityQueue items are a tuple of (priority, job) filter(lambda job: type(job[1]) in (MessageDownloadJob, ReplyDownloadJob), self.queue) ) return len(message_and_reply_download_jobs) class RunnableQueue(QObject): """ RunnableQueue maintains a priority queue and processes jobs in that queue. It continuously processes the next job in the queue, which is ordered by highest priority. Priority is based on job type. If multiple jobs of the same type are added to the queue then they are retrieved in FIFO order. If a RequestTimeoutError or ServerConnectionError is encountered while processing a job, the job will be added back to the queue, the processing loop will stop, and the paused signal will be emitted. New jobs can still be added, but the processing function will need to be called again in order to resume. The processing loop is resumed when the resume signal is emitted. If an ApiInaccessibleError is encountered while processing a job, api_client will be set to None and the processing loop will stop. If the queue is resumed before the queue manager stops the queue thread, api_client will still be None and the next job will raise an ApiInaccessibleError before it makes an api call, which will repeat this process. Any other exception encountered while processing a job is unexpected, so the queue will drop the job and continue on to processing the next job. The job itself is responsible for emitting the success and failure signals, so when an unexpected error occurs, it should emit the failure signal so that the Controller can respond accordingly. """ # These are the priorities for processing jobs. Lower numbers corresponds to a higher priority. JOB_PRIORITIES = { ClearQueueJob: 0, # Must preempt all other jobs PauseQueueJob: 11, FileDownloadJob: 13, # File downloads processed in separate queue DeleteSourceJob: 14, DeleteConversationJob: 14, SendReplyJob: 15, UpdateStarJob: 16, MessageDownloadJob: 17, ReplyDownloadJob: 17, SeenJob: 18, } # Signal that is emitted when processing is stopped and queued jobs are cleared cleared = pyqtSignal() # Signal that is emitted when processing is paused paused = pyqtSignal() # Signal that is emitted to resume processing jobs resume = pyqtSignal() def __init__( self, api_client: API | None, session_maker: scoped_session, queue_updated_signal: pyqtBoundSignal | None = None, ) -> None: super().__init__() self.api_client = api_client self.session_maker = session_maker self.queue = RunnablePriorityQueue(queue_updated_signal=queue_updated_signal) # type: PriorityQueue[tuple[int, QueueJob]] # `order_number` ensures jobs with equal priority are retrieved in FIFO order. This is # needed because PriorityQueue is implemented using heapq which does not have sort # stability. For more info, see : https://bugs.python.org/issue17794 self.order_number = itertools.count() self.current_job: QueueJob | None = None # Hold when reading/writing self.current_job or mutating queue state self.condition_add_or_remove_job = threading.Condition() self.resume.connect(self.process) def _check_for_duplicate_jobs(self, job: QueueJob) -> bool: """ Queued jobs are stored on self.queue.queue. The currently executing job is stored on self.current_job. We check that the job to be added is not among them. """ in_progress_jobs = [in_progress_job for priority, in_progress_job in self.queue.queue] if self.current_job is not None: in_progress_jobs.append(self.current_job) if job in in_progress_jobs: logger.debug(f"Duplicate job {job}, skipping") return True return False def _clear(self) -> None: """ Reinstantiate the PriorityQueue, rather than trying to clear it via undocumented methods.[1] [1]: https://stackoverflow.com/a/38560911 """ with self.condition_add_or_remove_job: self.queue = PriorityQueue() self.cleared.emit() def add_job(self, job: QueueJob) -> None: """ Add the job with its priority to the queue after assigning it the next order_number. Can block while waiting to acquire condition_add_or_remove_job. """ with self.condition_add_or_remove_job: if self._check_for_duplicate_jobs(job): return logger.debug(f"Added {job} to queue") current_order_number = next(self.order_number) job.order_number = current_order_number priority = self.JOB_PRIORITIES[type(job)] self.queue.put_nowait((priority, job)) self.condition_add_or_remove_job.notify() def _re_add_job(self, job: QueueJob) -> None: """ Reset the job's remaining attempts and put it back into the queue in the order in which it was submitted by the user (do not assign it the next order_number). Used internally. When called condition_add_or_remove_job should be held. """ if self._check_for_duplicate_jobs(job): return logger.debug(f"Added {job} to queue") job.remaining_attempts = DEFAULT_NUM_ATTEMPTS priority = self.JOB_PRIORITIES[type(job)] self.queue.put_nowait((priority, job)) self.condition_add_or_remove_job.notify() @pyqtSlot() def process(self) -> None: """ Process the next job in the queue. If the job is a ClearQueueJob, call _clear() and return from the processing loop so that the processing thread can quit. If the job is a PauseQueueJob, emit the paused signal and return from the processing loop so that no more jobs are processed until the queue resumes. If the job raises RequestTimeoutError or ServerConnectionError, then: (1) Add a PauseQueuejob to the queue (2) Add the job back to the queue so that it can be reprocessed once the queue is resumed. If the job raises ApiInaccessibleError, then: (1) Set the token to None so that the queue manager will stop enqueuing jobs since we are no longer able to make api requests. (2) Return from the processing loop since a valid token will be needed in order to process jobs. Note: Generic exceptions are handled in _do_call_api. """ while True: with self.condition_add_or_remove_job: self.condition_add_or_remove_job.wait_for(lambda: not self.queue.empty()) priority, self.current_job = self.queue.get(block=False) if isinstance(self.current_job, ClearQueueJob): with self.condition_add_or_remove_job: self.current_job = None self._clear() return if isinstance(self.current_job, PauseQueueJob): self.paused.emit() with self.condition_add_or_remove_job: self.current_job = None return try: if isinstance(self.current_job, ApiJob): session = self.session_maker() self.current_job._do_call_api(self.api_client, session) except ApiInaccessibleError as e: logger.debug(f"{type(e).__name__}: {e}") self.api_client = None with self.condition_add_or_remove_job: self.current_job = None return except (RequestTimeoutError, ServerConnectionError) as e: logger.debug(f"{type(e).__name__}: {e}") self.add_job(PauseQueueJob()) with self.condition_add_or_remove_job: job, self.current_job = self.current_job, None self._re_add_job(job) except Exception as e: logger.error("Skipping job") logger.debug(f"Skipping job: {type(e).__name__}: {e}") finally: with self.condition_add_or_remove_job: self.current_job = None session.close() class ApiJobQueue(QObject): """ ApiJobQueue is the queue manager of two FIFO priority queues that process jobs of type ApiJob. The queue manager starts the queues when a new auth token is provided to ensure jobs are able to make their requests. It stops the queues whenever a MetadataSyncJob, which runs in a continuous loop outside of the queue manager, encounters an ApiInaccessibleError and forces a logout from the Controller. """ # Signal that is emitted after a queue is cleared. cleared = pyqtSignal() # Signal that is emitted after a queue is paused. paused = pyqtSignal() # Signal emitted when an item is added or removed from the main queue main_queue_updated = pyqtSignal(int) def __init__( self, api_client: API | None, session_maker: scoped_session, main_thread: QThread, download_file_thread: QThread, ) -> None: super().__init__(None) self.main_thread = main_thread self.download_file_thread = download_file_thread self.main_queue = RunnableQueue( api_client, session_maker, queue_updated_signal=self.main_queue_updated ) self.download_file_queue = RunnableQueue(api_client, session_maker) self.main_queue.moveToThread(self.main_thread) self.download_file_queue.moveToThread(self.download_file_thread) self.main_thread.started.connect(self.main_queue.process) self.download_file_thread.started.connect(self.download_file_queue.process) self.main_queue.paused.connect(self.on_main_queue_paused) self.download_file_queue.paused.connect(self.on_file_download_queue_paused) self.main_queue.cleared.connect(self.on_main_queue_cleared) self.download_file_queue.cleared.connect(self.on_file_download_queue_cleared) def start(self, api_client: API) -> None: """ Start the queues whenever a new api token is provided. """ self.main_queue.api_client = api_client self.download_file_queue.api_client = api_client if not self.main_thread.isRunning(): self.main_thread.start() logger.debug("Started main queue") if not self.download_file_thread.isRunning(): self.download_file_thread.start() logger.debug("Started file download queue") def stop(self) -> None: """ Inject a ClearQueueJob into each queue and quit its processing thread. To keep this method non-blocking, we do NOT wait() for the thread to return, which will happen only when RunnableQueue.process() reaches the ClearQueueJob and returns from its loop. """ if self.main_thread.isRunning(): self.main_queue.add_job(ClearQueueJob()) self.main_thread.quit() logger.debug("Asked main queue thread to quit") if self.download_file_thread.isRunning(): self.download_file_queue.add_job(ClearQueueJob()) self.download_file_thread.quit() logger.debug("Asked file-download queue thread to quit") @pyqtSlot() def on_main_queue_paused(self) -> None: """ Emit the paused signal if the main queue has been paused. """ logger.debug("Paused main queue") self.paused.emit() @pyqtSlot() def on_file_download_queue_paused(self) -> None: """ Emit the paused signal if the file download queue has been paused. """ logger.debug("Paused file download queue") self.paused.emit() @pyqtSlot() def on_main_queue_cleared(self) -> None: """ Emit the "cleared" signal when the main RunnableQueue is cleared. """ logger.debug("Cleared main queue") self.cleared.emit() @pyqtSlot() def on_file_download_queue_cleared(self) -> None: """ Emit the "cleared" signal when the file-download RunnableQueue is cleared. """ logger.debug("Cleared file download queue") self.cleared.emit() def resume_queues(self) -> None: """ Emit the resume signal to the queues if they are running. """ if self.main_thread.isRunning(): logger.debug("Resuming main queue") self.main_queue.resume.emit() if self.download_file_thread.isRunning(): logger.debug("Resuming download queue") self.download_file_queue.resume.emit() @pyqtSlot(object) def enqueue(self, job: ApiJob) -> None: """ Enqueue the supplied job if the queues are running. """ if not self.main_thread.isRunning() or not self.download_file_thread.isRunning(): logger.debug("Not adding job before queues have been started.") return if isinstance(job, FileDownloadJob): self.download_file_queue.add_job(job) else: self.main_queue.add_job(job)