client/securedrop_client/api_jobs/sources.py (41 lines of code) (raw):
import logging
from sqlalchemy.orm.session import Session
from securedrop_client import sdk
from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.sdk import API, RequestTimeoutError, ServerConnectionError
logger = logging.getLogger(__name__)
class DeleteSourceJob(ApiJob):
def __init__(self, uuid: str) -> None:
super().__init__()
self.uuid = uuid
def call_api(self, api_client: API, session: Session) -> str:
"""
Override ApiJob.
Delete a source on the server
"""
try:
source_sdk_object = sdk.Source(uuid=self.uuid)
api_client.delete_source(source_sdk_object)
return self.uuid
except (RequestTimeoutError, ServerConnectionError):
raise
except Exception as e:
error_message = f"Failed to delete source {self.uuid} due to {repr(e)}"
raise DeleteSourceJobException(error_message, self.uuid)
class DeleteConversationJob(ApiJob):
def __init__(self, uuid: str) -> None:
super().__init__()
self.uuid = uuid
def call_api(self, api_client: API, session: Session) -> str:
"""
Override ApiJob.
Delete a source on the server
"""
try:
api_client.delete_conversation(uuid=self.uuid)
return self.uuid
except (RequestTimeoutError, ServerConnectionError):
raise
except Exception as e:
error_message = f"Failed to delete conversation for source {self.uuid}: {repr(e)}"
raise DeleteConversationJobException(error_message, self.uuid)
class DeleteConversationJobException(Exception):
def __init__(self, message: str, source_uuid: str):
super().__init__(message)
self.source_uuid = source_uuid
class DeleteSourceJobException(Exception):
def __init__(self, message: str, source_uuid: str):
super().__init__(message)
self.source_uuid = source_uuid