client/securedrop_client/sdk/__init__.py (643 lines of code) (raw):

import configparser import http import json import logging import os import subprocess import tempfile from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any from securedrop_client import utils from securedrop_client.config import Config from .sdlocalobjects import ( AuthError, BaseError, Reply, ReplyError, Source, Submission, User, WrongUUIDError, ) from .timestamps import parse as parse_datetime logger = logging.getLogger(__name__) DEFAULT_PROXY_VM_NAME = "sd-proxy" DEFAULT_REQUEST_TIMEOUT = 20 # 20 seconds DEFAULT_DOWNLOAD_TIMEOUT = 60 * 60 # 60 minutes class RequestTimeoutError(Exception): """ Error raised if a request times out. """ def __init__(self) -> None: super().__init__("The request timed out.") def __reduce__(self) -> tuple[type, tuple]: return (self.__class__, ()) class ServerConnectionError(Exception): """ Error raised if we cannot connect to the server. """ def __init__(self) -> None: super().__init__("Cannot connect to the server.") def __reduce__(self) -> tuple[type, tuple]: return (self.__class__, ()) @dataclass(frozen=True) class StreamedResponse: """Container for streamed data along with the ETag checksum sent by the server.""" contents: bytes sha256sum: str @dataclass(frozen=True) class JSONResponse: """Deserialization of the proxy's `OutgoingResponse`.""" data: dict status: int headers: dict[str, str] class API: """ This class handles all network calls to the SecureDrop API server. :param address: Server URL (http://localhost:8081/) :param username: Journalist username :param passphrase: Journalist passphrase :param totp: Current TOTP value :param proxy: Whether the API class should use the RPC proxy :param default_request_timeout: Default timeout for a request (non-download) in seconds :param default_download_timeout: Default timeout for a request (download only) in seconds :returns: An object of API class. """ def __init__( self, address: str, username: str, passphrase: str, totp: str, proxy: bool = False, default_request_timeout: int | None = None, default_download_timeout: int | None = None, ) -> None: """ Primary API class, this is the only thing which will make network call. """ self.server = address self.username = username self.passphrase = passphrase self.totp = totp self.token: str | None = None self.token_expiration: datetime | None = None self.token_journalist_uuid: str | None = None self.first_name: str | None = None self.last_name: str | None = None self.development_mode: bool = not proxy self.default_request_timeout = default_request_timeout or DEFAULT_REQUEST_TIMEOUT self.default_download_timeout = default_download_timeout or DEFAULT_DOWNLOAD_TIMEOUT self.proxy_vm_name = DEFAULT_PROXY_VM_NAME config_parser = configparser.ConfigParser() try: if os.path.exists("/etc/sd-sdk.conf"): config_parser.read("/etc/sd-sdk.conf") self.proxy_vm_name = config_parser["proxy"]["name"] except Exception: pass # We already have a default name # Load download retry limit from the config self.download_retry_limit = Config.load().download_retry_limit def _rpc_target(self) -> list: """In `development_mode`, check `cargo` for a locally-built proxy binary. Otherwise, call `securedrop.Proxy` via qrexec.""" if not self.development_mode: return ["/usr/lib/qubes/qrexec-client-vm", self.proxy_vm_name, "securedrop.Proxy"] # Development mode, find the target directory and look for a debug securedrop-proxy # binary. We assume that `cargo build` has already been run. We don't use `cargo run` # because it adds its own output that would interfere with ours. target_directory = json.loads( subprocess.check_output(["cargo", "metadata", "--format-version", "1"], text=True) )["target_directory"] return [f"{target_directory}/debug/securedrop-proxy"] def _streaming_download( self, data: dict[str, Any], env: dict ) -> StreamedResponse | JSONResponse: fobj = tempfile.TemporaryFile("w+b") retry = 0 bytes_written = 0 download_finished = False while not download_finished and retry < self.download_retry_limit: logger.debug(f"Streaming download, retry {retry}") try: # Update the range request if we're retrying if retry > 0: data["headers"]["Range"] = f"bytes={bytes_written}-" logger.debug(f"Retry {retry}, range: {bytes_written}-") # Serialize the data to send data_bytes = json.dumps(data).encode() # Open the process logger.debug(f"Retry {retry}, opening process") proc = subprocess.Popen( self._rpc_target(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, ) # Send the input data if proc.stdin is not None: logger.debug(f"Retry {retry}, sending data") proc.stdin.write(data_bytes) proc.stdin.close() else: logger.error(f"Retry {retry}, stdin is None") # TODO: should we raise an exception here? If we don't pass in stdin, # the request will end up failing # Write the contents to disk chunk_size = 1024 while not download_finished: if proc.stdout is not None: chunk = proc.stdout.read(chunk_size) if not chunk: logger.debug(f"Retry {retry}, download finished") download_finished = True break fobj.write(chunk) bytes_written += len(chunk) logger.debug(f"Retry {retry}, bytes written: {bytes_written:,}") # Wait for the process to end returncode = proc.wait() logger.debug(f"Retry {retry}, process ended with return code {returncode}") # Check for errors if returncode != 0: # Actually, the download did not finish after all download_finished = False error = {} if proc.stderr is not None: error_json = proc.stderr.read().decode() try: error = json.loads(error_json) except json.decoder.JSONDecodeError as err: logger.debug(f"Retry {retry}, invalid error JSON: {error_json}") raise BaseError("Unable to parse stderr JSON") from err error_description = error.get("error", "unknown error") logger.debug(f"Retry {retry}, internal proxy error: {error_description}") logger.error(f"Retry {retry}, internal proxy error") raise BaseError(f"Internal proxy error: {error_description}") # FIXME: For now, store the contents as bytes logger.debug(f"Retry {retry}, reading contents from disk") fobj.seek(0) contents = fobj.read() fobj.close() # Check for an error response if contents[0:1] == b"{": logger.error(f"Retry {retry}, received JSON error response.") return self._handle_json_response(contents) # Get the headers if proc.stderr is not None: headers_json = proc.stderr.read().decode() try: stderr = json.loads(headers_json) except json.decoder.JSONDecodeError as err: logger.debug( f"Retry {retry}, invalid headers (stderr) JSON: {headers_json}" ) raise BaseError("Unable to parse headers (stderr) JSON") from err sha256sum = stderr["headers"]["etag"] else: # This should never happen because we should always have an stderr sha256sum = "" return StreamedResponse( contents=contents, sha256sum=sha256sum, ) except subprocess.TimeoutExpired as err: logger.error(f"Retry {retry}, timeout expired") retry += 1 if retry >= self.download_retry_limit: logger.error("Retry limit reached after subprocess.TimeoutExpired") raise RequestTimeoutError from err except BaseError as err: logger.debug(f"Retry {retry}, base error: {err}") logger.error(f"Retry {retry}, base error") retry += 1 if retry >= self.download_retry_limit: logger.error("Retry limit reached after BaseError") raise err # We will never reach this code because we'll have already returned or raised # an exception by now, but it's required to make the linter happy logger.error( f"Reached unreachable exception. retry={retry}, " f"bytes_written={bytes_written}, download_finished={download_finished}" ) raise RuntimeError( "This should be unreachable, we should've already returned or raised a different exception" # noqa: E501 ) def _handle_json_response(self, stdout_bytes: bytes) -> JSONResponse: try: result = json.loads(stdout_bytes.decode()) except json.decoder.JSONDecodeError as err: raise BaseError("Unable to parse stdout JSON") from err if result["status"] == http.HTTPStatus.GATEWAY_TIMEOUT: raise RequestTimeoutError elif result["status"] == http.HTTPStatus.BAD_GATEWAY: raise ServerConnectionError elif result["status"] == http.HTTPStatus.FORBIDDEN: raise AuthError("Forbidden") elif result["status"] == http.HTTPStatus.BAD_REQUEST: # FIXME: return a more generic error raise ReplyError("bad request") elif ( str(result["status"]).startswith(("4", "5")) and result["status"] != http.HTTPStatus.NOT_FOUND ): # We exclude 404 since if we encounter a 404, it means that an # item is missing. In that case we return to the caller to # handle that with an appropriate message. However, if the error # is not a 404, then we raise. logger.error(f"API error: status={result['status']}") raise BaseError(f"Unknown error, status: {result['status']}") data = json.loads(result["body"]) return JSONResponse(data=data, status=result["status"], headers=result["headers"]) def _send_json_request( self, method: str, path_query: str, stream: bool = False, body: str | None = None, headers: dict[str, str] | None = None, timeout: int | None = None, ) -> StreamedResponse | JSONResponse: """Build a JSON-serialized request to pass to the proxy. Handle the JSON or streamed response back, plus translate HTTP error statuses to our exceptions.""" data: dict[str, Any] = {"method": method, "path_query": path_query, "stream": stream} if method == "POST" and body: data["body"] = body if headers: data["headers"] = headers if timeout: data["timeout"] = timeout logger.debug( f"Sending request to proxy: {method} {path_query} (body={'body' in data}, " f"stream={stream}, timeout={timeout})" ) env = {} if self.development_mode: env["SD_PROXY_ORIGIN"] = self.server # Streaming if stream: return self._streaming_download(data, env) # Not streaming data_str = json.dumps(data).encode() try: response = subprocess.run( self._rpc_target(), capture_output=True, timeout=timeout, input=data_str, env=env, check=False, ) except subprocess.TimeoutExpired as err: logger.error(f"Non-streaming reqest timed out (path_query={path_query})") raise RequestTimeoutError from err # Error handling if response.returncode != 0: error_json = response.stderr.decode() try: error = json.loads(error_json) except json.decoder.JSONDecodeError as err: logger.debug(f"Unable to parse stderr JSON: {error_json}") raise BaseError("Unable to parse stderr JSON") from err error_desc = error.get("error", "unknown error") logger.debug(f"Internal proxy error: {error_desc}") logger.error("Internal proxy error (non-streaming)") raise BaseError(f"Internal proxy error: {error_desc}") return self._handle_json_response(response.stdout) def authenticate(self, totp: str | None = None) -> bool: """ Authenticates the user and fetches the token from the server. :returns: True if authentication is successful, raise AuthError otherwise. """ if totp is None: totp = self.totp user_data = { "username": self.username, "passphrase": self.passphrase, "one_time_code": totp, } method = "POST" path_query = "api/v1/token" body = json.dumps(user_data) response = self._send_json_request( method, path_query, body=body, timeout=self.default_request_timeout ) assert isinstance(response, JSONResponse) if "expiration" not in response.data: raise AuthError("Authentication error") token_expiration = parse_datetime(response.data["expiration"]) if token_expiration is None: raise BaseError("Error in parsing token expiration time") self.token = response.data["token"] self.token_expiration = token_expiration self.token_journalist_uuid = response.data["journalist_uuid"] self.first_name = response.data["journalist_first_name"] self.last_name = response.data["journalist_last_name"] return True def build_headers(self) -> dict[str, str]: # Build headers dynamically each time to make sure # the dict is safe to mutate. headers = { "Content-Type": "application/json", "Accept": "application/json", } if self.token is not None: headers["Authorization"] = "Token " + self.token return headers def get_sources(self) -> list[Source]: """ Returns a list of all the sources from the Server. :returns: List of Source objects. """ path_query = "api/v1/sources" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) sources = response.data["sources"] result: list[Source] = [] for source in sources: s = Source(**source) result.append(s) return result def get_source(self, source: Source) -> Source: """ This will return a single Source based on UUID. :param source: Source object containing only source's UUID value. :returns: Source object fetched from server for the given UUID value. """ path_query = f"api/v1/sources/{source.uuid}" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {source.uuid}") return Source(**response.data) def delete_source(self, source: Source) -> bool: """ This method will delete the source and collection. If the UUID is not found in the server, it will raise WrongUUIDError. :param source: Source object containing only source's UUID value. :returns: True if successful, raises Errors in case of wrong values. """ path_query = f"api/v1/sources/{source.uuid}" method = "DELETE" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {source.uuid}") if response.data.get("message") == "Source and submissions deleted": return True # We should never reach here return False def delete_conversation(self, uuid: str) -> bool: """ This method will delete the source's conversation. If the UUID is not found in the server, it will raise WrongUUIDError. :param uuid: Source UUID as string. :returns: True if the operation is successful. """ path_query = f"api/v1/sources/{uuid}/conversation" method = "DELETE" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {uuid}") if response.data.get("message") == "Source data deleted": return True return False def add_star(self, source: Source) -> bool: """ Adds a star to a given source. :param source: The source object to whom we want add a star. :returns: True if successful, raises Error otherwise. """ path_query = f"api/v1/sources/{source.uuid}/add_star" method = "POST" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {source.uuid}") if response.data.get("message") == "Star added": return True return False def remove_star(self, source: Source) -> bool: """Removes star from a given Source. :param source: Source object to remove the star from. :returns: True if successful, raises Error otherwise. """ path_query = f"api/v1/sources/{source.uuid}/remove_star" method = "DELETE" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {source.uuid}") if response.data.get("message") == "Star removed": return True return False def get_submissions(self, source: Source) -> list[Submission]: """ Returns a list of Submission objects from the server for a given source. :param source: Source object for whom we want to find all the submissions. :returns: List of Submission objects. """ path_query = f"api/v1/sources/{source.uuid}/submissions" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing submission {source.uuid}") result: list[Submission] = [] values = response.data["submissions"] for val in values: s = Submission(**val) result.append(s) return result def get_submission(self, submission: Submission) -> Submission: """ Returns the updated Submission object from the server. :param submission: Submission object we want to update. :returns: Updated submission object from the server. """ if submission.source_uuid and submission.uuid is not None: path_query = f"api/v1/sources/{submission.source_uuid}/submissions/{submission.uuid}" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing submission {submission.uuid}") return Submission(**response.data) else: # XXX: is this the correct behavior return submission def get_all_submissions(self) -> list[Submission]: """ Returns a list of Submission objects from the server. :returns: List of Submission objects. """ path_query = "api/v1/submissions" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) result: list[Submission] = [] values = response.data["submissions"] for val in values: s = Submission(**val) result.append(s) return result def delete_submission(self, submission: Submission) -> bool: """ Deletes a given Submission object from the server. :param submission: The Submission object we want to delete in the server. :returns: True if successful, raises Error otherwise. """ # Not using direct URL because this helps to use the same method # from local submission (not fetched from server) objects. path_query = f"api/v1/sources/{submission.source_uuid}/submissions/{submission.uuid}" method = "DELETE" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing submission {submission.uuid}") if response.data.get("message") == "Submission deleted": return True # We should never reach here return False def download_submission( self, submission: Submission, original_path: str | None = None, timeout: int | None = None ) -> tuple[str, str]: """ Returns a tuple of etag (format is algorithm:checksum) and file path for a given Submission object. This method requires a directory path at which to save the submission file. :param submission: Submission object :param original_path: Local directory path to save the submission, if None, use ~/Downloads :returns: Tuple of etag and path of the saved submission. """ path_query = ( f"api/v1/sources/{submission.source_uuid}/submissions/{submission.uuid}/download" ) method = "GET" if original_path: path = Path(original_path) else: path = Path("~/Downloads").expanduser() if not path.is_dir(): raise BaseError(f"Specified path isn't a directory: {path}") response = self._send_json_request( method, path_query, stream=True, headers=self.build_headers(), timeout=timeout or self.default_download_timeout, ) if isinstance(response, JSONResponse): if response.status == 404: raise WrongUUIDError(f"Missing submission {submission.uuid}") else: raise BaseError(f"Unknown error, status code: {response.status}") filepath = path / submission.filename # submission.filename should have already been validated, but let's # double check before we write anything utils.check_path_traversal(filepath) filepath.write_bytes(response.contents) return response.sha256sum.strip('"'), str(filepath) def flag_source(self, source: Source) -> bool: """ Flags a source for reply. :param source: Source object we want to flag. :returns: True if successful, raises Error otherwise. """ path_query = f"api/v1/sources/{source.uuid}/flag" method = "POST" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {source.uuid}") return True def get_current_user(self) -> dict: """ Returns a dictionary of the current user data. Example: { 'is_admin': True, 'last_login': '2018-08-01T19:10:38.199429Z', 'username': 'journalist' } """ path_query = "api/v1/user" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) return response.data def get_users(self) -> list[User]: """ Returns a list of all the journalist and admin users registered on the server. :returns: List of User objects. """ path_query = "api/v1/users" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) users = response.data["users"] result: list[User] = [] for user in users: u = User(**user) result.append(u) return result def reply_source(self, source: Source, msg: str, reply_uuid: str | None = None) -> Reply: """ This method is used to reply to a given source. The message should be preencrypted with the source's GPG public key. :param source: Source object to whom we want to reply. :param msg: Encrypted message with Source's GPG public key. :param reply_uuid: The UUID that will be used to identify the reply on the server. """ path_query = f"api/v1/sources/{source.uuid}/replies" method = "POST" reply = {"reply": msg} if reply_uuid: reply["uuid"] = reply_uuid response = self._send_json_request( method, path_query, body=json.dumps(reply), headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.data.get("message") == "Your reply has been stored": return Reply(uuid=response.data["uuid"], filename=response.data["filename"]) raise ReplyError("bad request") def get_replies_from_source(self, source: Source) -> list[Reply]: """ This will return a list of replies associated with a source. :param source: Source object containing only source's UUID value. :returns: List of Reply objects. """ path_query = f"api/v1/sources/{source.uuid}/replies" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {source.uuid}") result = [] for datum in response.data["replies"]: reply = Reply(**datum) result.append(reply) return result def get_reply_from_source(self, source: Source, reply_uuid: str) -> Reply: """ This will return a single specific reply. :param source: Source object. :param reply_uuid: UUID of the reply. :returns: A reply object """ if source.uuid and reply_uuid is not None: path_query = f"api/v1/sources/{source.uuid}/replies/{reply_uuid}" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing source {source.uuid}") reply = Reply(**response.data) return reply def get_all_replies(self) -> list[Reply]: """ This will return a list of all replies from the server. :returns: List of Reply objects. """ path_query = "api/v1/replies" method = "GET" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) result = [] for datum in response.data["replies"]: reply = Reply(**datum) result.append(reply) return result def download_reply(self, reply: Reply, original_path: str | None = None) -> tuple[str, str]: """ Returns a tuple of etag (format is algorithm:checksum) and file path for a given Reply object. This method requires a directory path at which to save the reply file. :param reply: Reply object :param original_path: Local directory path to save the reply :returns: Tuple of etag and path of the saved Reply. """ path_query = f"api/v1/sources/{reply.source_uuid}/replies/{reply.uuid}/download" method = "GET" if original_path: path = Path(original_path) else: path = Path("~/Downloads").expanduser() if not os.path.isdir(path): raise BaseError(f"Specified path isn't a directory: {path}") response = self._send_json_request( method, path_query, stream=True, headers=self.build_headers(), timeout=self.default_request_timeout, ) if isinstance(response, JSONResponse): if response.status == 404: raise WrongUUIDError(f"Missing reply {reply.uuid}") else: raise BaseError(f"Unknown error, status code: {response.status}") filepath = path / reply.filename # reply.filename should have already been validated, but let's # double check before we write anything utils.check_path_traversal(filepath) filepath.write_bytes(response.contents) return response.sha256sum.strip('"'), str(filepath) def delete_reply(self, reply: Reply) -> bool: """ Deletes a given Reply object from the server. :param reply: The Reply object we want to delete in the server. :returns: True if successful, raises Error otherwise. """ # Not using direct URL because this helps to use the same method # from local reply (not fetched from server) objects. path_query = f"api/v1/sources/{reply.source_uuid}/replies/{reply.uuid}" method = "DELETE" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) if response.status == 404: raise WrongUUIDError(f"Missing reply {reply.uuid}") if response.data.get("message") == "Reply deleted": return True # We should never reach here return False def logout(self) -> bool: """ Logs the current user out. """ path_query = "api/v1/logout" method = "POST" response = self._send_json_request( method, path_query, headers=self.build_headers(), timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) return response.data.get("message") == "Your token has been revoked." def seen(self, files: list[str], messages: list[str], replies: list[str]) -> str: """ Mark supplied files, messages, and replies as seen by the current user. The current user will be retrieved from the auth header on the server. :param conversation_items: The items to mark as seen by the current user. :param files: list of file uuids to mark as seen :param messages: list of message uuids to mark as seen :param replies: list of reply uuids to mark as seen :returns: Raise exception if 404, else the direct response from the server """ method = "POST" path_query = "api/v1/seen" body = json.dumps({"files": files, "messages": messages, "replies": replies}) response = self._send_json_request( method, path_query, headers=self.build_headers(), body=body, timeout=self.default_request_timeout, ) assert isinstance(response, JSONResponse) data_str = json.dumps(response.data) if response.status == 404: raise WrongUUIDError(f"{data_str}") # FIXME: why are we returning a string with a JSON-encoded blob??? return data_str