client/securedrop_client/sync.py (95 lines of code) (raw):

import logging from PyQt5.QtCore import QObject, QThread, QTimer, pyqtBoundSignal, pyqtSignal from sqlalchemy.orm import scoped_session from securedrop_client import state from securedrop_client.api_jobs.base import ApiInaccessibleError from securedrop_client.api_jobs.sync import MetadataSyncJob from securedrop_client.crypto import GpgHelper from securedrop_client.sdk import API logger = logging.getLogger(__name__) class ApiSync(QObject): """ ApiSync continuously syncs, waiting 15 seconds between task completion. """ sync_started = pyqtSignal() sync_success = pyqtSignal() sync_failure = pyqtSignal(Exception) TIME_BETWEEN_SYNCS_MS = 1000 * 15 # fifteen seconds between syncs def __init__( self, api_client: API | None, session_maker: scoped_session, gpg: GpgHelper, data_dir: str, sync_thread: QThread, app_state: state.State | None = None, ): super().__init__() self.api_client = api_client if sync_thread is not None: self.sync_thread = sync_thread self.api_sync_bg_task = ApiSyncBackgroundTask( api_client, session_maker, gpg, data_dir, self.sync_started, self.on_sync_success, self.on_sync_failure, app_state, ) self.api_sync_bg_task.moveToThread(self.sync_thread) self.sync_thread.started.connect(self.api_sync_bg_task.sync) self.timer = QTimer() self.timer.setInterval(self.TIME_BETWEEN_SYNCS_MS) self.timer.timeout.connect(self.sync) def start(self, api_client: API) -> None: """ Start metadata syncs. """ self.api_client = api_client self.timer.start() if not self.sync_thread.isRunning(): logger.debug("Starting sync thread") self.api_sync_bg_task.api_client = self.api_client self.sync_thread.start() def stop(self) -> None: """ Stop metadata syncs. """ self.api_client = None if self.sync_thread.isRunning(): logger.debug("Stopping sync thread") self.sync_thread.quit() def on_sync_success(self) -> None: """ Start another sync on success. """ self.sync_success.emit() def on_sync_failure(self, result: Exception) -> None: """ Only start another sync on failure if the reason is a timeout request. """ self.sync_failure.emit(result) def sync(self) -> None: """ Start an immediate sync. """ QTimer.singleShot(1, self.api_sync_bg_task.sync) class ApiSyncBackgroundTask(QObject): """ ApiSyncBackgroundTask provides a sync method that executes a MetadataSyncJob. """ def __init__( # type: ignore[no-untyped-def] self, api_client: API | None, session_maker: scoped_session, gpg: GpgHelper, data_dir: str, sync_started: pyqtBoundSignal, on_sync_success, on_sync_failure, app_state: state.State | None = None, ): super().__init__() self.api_client = api_client self.session_maker = session_maker self.gpg = gpg self.data_dir = data_dir self.sync_started = sync_started self.on_sync_success = on_sync_success self.on_sync_failure = on_sync_failure self.job = MetadataSyncJob(self.data_dir, app_state) self.job.success_signal.connect(self.on_sync_success) self.job.failure_signal.connect(self.on_sync_failure) def sync(self) -> None: """ Create and run a new MetadataSyncJob. """ try: self.sync_started.emit() session = self.session_maker() self.job.remaining_attempts = 2 self.job._do_call_api(self.api_client, session) except ApiInaccessibleError as e: self.job.failure_signal.emit(e) # the job's failure signal is not emitted in base except Exception: pass # the job's failure signal is emitted for everything else in base finally: session.close()