client/securedrop_client/api_jobs/sync.py (84 lines of code) (raw):

import logging import os from typing import Any from sqlalchemy.orm.session import Session from securedrop_client import state from securedrop_client.api_jobs.base import ApiJob from securedrop_client.db import DeletedUser, DraftReply, User from securedrop_client.sdk import API from securedrop_client.sdk import User as SDKUser from securedrop_client.storage import get_remote_data, update_local_storage logger = logging.getLogger(__name__) class MetadataSyncJob(ApiJob): """ Update source metadata such that new download jobs can be added to the queue. """ DEFAULT_REQUEST_TIMEOUT = 60 # sec NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL = 2 def __init__(self, data_dir: str, app_state: state.State | None = None) -> None: super().__init__(remaining_attempts=self.NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL) self.data_dir = data_dir self._state = app_state def call_api(self, api_client: API, session: Session) -> Any: """ Override ApiJob. Download new metadata, update the local database, import new keys, and then the success signal will let the controller know to add any new download jobs. """ # TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to # pass the default request timeout to api calls instead of setting it on the api object # directly. # # This timeout is used for 3 different requests: `get_sources`, `get_all_submissions`, and # `get_all_replies` api_client.default_request_timeout = int( os.environ.get("SDEXTENDEDTIMEOUT", self.DEFAULT_REQUEST_TIMEOUT) ) if api_client.default_request_timeout != self.DEFAULT_REQUEST_TIMEOUT: logger.warning( f"{self.__class__.__name__} will use " f"default_request_timeout={api_client.default_request_timeout}" ) users = api_client.get_users() MetadataSyncJob._update_users(session, users) sources, submissions, replies = get_remote_data(api_client) update_local_storage(session, sources, submissions, replies, self.data_dir) if self._state is not None: _update_state(self._state, submissions) def _update_users(session: Session, remote_users: list[SDKUser]) -> None: """ 1. Create local user accounts for each remote user that doesn't already exist 2. Update existing local users 3. Re-associate any draft replies sent by a user that is about to be deleted 4. Delete all remaining local user accounts that no longer exist on the server """ deleted_user_id: int | None = None local_users = {user.uuid: user for user in session.query(User).all()} for remote_user in remote_users: local_user = local_users.get(remote_user.uuid) if not local_user: # Create local user account new_user = User( uuid=remote_user.uuid, username=remote_user.username, firstname=remote_user.first_name, lastname=remote_user.last_name, ) session.add(new_user) # If the new user is the "deleted" user account, store its id in case we need to # reassociate draft replies later. if new_user.deleted: session.commit() deleted_user_id = new_user.id logger.debug(f"Adding account for user with uuid='{new_user.uuid}'") else: # Update existing local users # If the local user is the "deleted" user account, store its id in case we need to # reassociate draft replies later. if local_user.deleted: deleted_user_id = local_user.id if local_user.username != remote_user.username: local_user.username = remote_user.username if local_user.firstname != remote_user.first_name: local_user.firstname = remote_user.first_name if local_user.lastname != remote_user.last_name: local_user.lastname = remote_user.last_name del local_users[remote_user.uuid] # Delete all remaining local user accounts that no longer exist on the server. # # In order to support an edge case that can occur on a pre-2.2.0 server that does not create # a "deleted" user account, the client will create one locally when there are draft replies # that need to be re-associated. Once the "deleted" user account exists on the server, it # will replace the local one. for uuid, account in local_users.items(): # Do not delete the local "deleted" user account if there is no "deleted" user account # on the server. if account.deleted and not deleted_user_id: continue # Get draft replies sent by the user who's account is about to be deleted. draft_replies = session.query(DraftReply).filter_by(journalist_id=account.id).all() # Create a local "deleted" user account if there is no "deleted" user account locally or # on the server and we are about to delete a user. if draft_replies and not account.deleted and not deleted_user_id: deleted_user = DeletedUser() session.add(deleted_user) session.commit() # commit so that we can retrieve the generated `id` deleted_user_id = deleted_user.id logger.debug(f"Creating DeletedUser with uuid='{deleted_user.uuid}'") # Re-associate draft replies for reply in draft_replies: reply.journalist_id = deleted_user_id logger.debug(f"DraftReply with uuid='{reply.uuid}' re-associated to DeletedUser") # Ensure re-associated draft replies are committed to the db before deleting the account if draft_replies: session.commit() session.delete(account) logger.debug(f"Deleting account for user with uuid='{uuid}'") session.commit() def _update_state(app_state: state.State, submissions: list) -> None: for submission in submissions: if submission.is_file(): app_state.add_file( state.ConversationId(submission.source_uuid), state.FileId(submission.uuid) )