client/securedrop_client/export.py (328 lines of code) (raw):

import json import logging import os import shutil import subprocess import tarfile from collections.abc import Callable from enum import Enum from io import BytesIO from shlex import quote from tempfile import TemporaryDirectory, mkdtemp from typing import Dict from PyQt5.QtCore import QObject, QProcess, pyqtSignal from securedrop_client.export_status import ExportError, ExportStatus logger = logging.getLogger(__name__) class Export(QObject): """ Interface for sending files to Export VM for transfer to a disk drive or printed by a USB-connected printer. Files are archived in a specified format, (see `export` README). A list of valid filepaths must be supplied. """ _METADATA_FN = "metadata.json" _USB_TEST_FN = "usb-test.sd-export" _USB_TEST_METADATA = {"device": "usb-test"} _PRINTER_PREFLIGHT_FN = "printer-preflight.sd-export" _PRINTER_PREFLIGHT_METADATA = {"device": "printer-preflight"} _PRINT_FN = "print_archive.sd-export" _PRINT_METADATA = {"device": "printer"} _DISK_FN = "archive.sd-export" _DISK_METADATA = {"device": "disk"} _DISK_ENCRYPTION_KEY_NAME = "encryption_key" _DISK_EXPORT_DIR = "export_data" _WHISTLEFLOW_PREFLIGHT_FN = "whistleflow-preflight.sd-export" # Emit export states export_state_changed = pyqtSignal(object) # Emit print states print_preflight_check_succeeded = pyqtSignal(object) print_succeeded = pyqtSignal(object) export_completed = pyqtSignal(object) print_preflight_check_failed = pyqtSignal(object) print_failed = pyqtSignal(object) whistleflow_preflight_check_failed = pyqtSignal(object) whistleflow_preflight_check_succeeded = pyqtSignal() whistleflow_call_failure = pyqtSignal(object) whistleflow_call_success = pyqtSignal() process = None # Optional[QProcess] tmpdir = None # mkdtemp directory must be cleaned up when QProcess completes def run_printer_preflight_checks(self) -> None: """ Make sure the Export VM is started. """ logger.info("Beginning printer preflight check") self.tmpdir = mkdtemp() os.chmod(self.tmpdir, 0o700) try: archive_path = self._create_archive( archive_dir=self.tmpdir, archive_fn=self._PRINTER_PREFLIGHT_FN, metadata=self._PRINTER_PREFLIGHT_METADATA, ) self._run_qrexec_export( archive_path, self._on_print_preflight_complete, self._on_print_prefight_error ) except ExportError as e: logger.error(f"Error creating archive: {e}") self._on_print_prefight_error() def run_export_preflight_checks(self) -> None: """ Run preflight check to verify that a valid USB device is connected. """ logger.debug("Beginning export preflight check") try: self.tmpdir = mkdtemp() os.chmod(self.tmpdir, 0o700) archive_path = self._create_archive( archive_dir=self.tmpdir, archive_fn=self._USB_TEST_FN, metadata=self._USB_TEST_METADATA, ) # Emits status via on_process_completed() self._run_qrexec_export( archive_path, self._on_export_process_complete, self._on_export_process_error ) except ExportError: logger.error("Export preflight check failed during archive creation") self._on_export_process_error() def export(self, filepaths: list[str], passphrase: str | None) -> None: """ Bundle filepaths into a tarball and send to encrypted USB via qrexec, optionally supplying a passphrase to unlock encrypted drives. """ try: logger.debug(f"Begin exporting {len(filepaths)} item(s)") # Edit metadata template to include passphrase metadata = self._DISK_METADATA.copy() if passphrase: metadata[self._DISK_ENCRYPTION_KEY_NAME] = passphrase self.tmpdir = mkdtemp() os.chmod(self.tmpdir, 0o700) archive_path = self._create_archive( archive_dir=self.tmpdir, archive_fn=self._DISK_FN, metadata=metadata, filepaths=filepaths, ) # Emits status through callbacks self._run_qrexec_export( archive_path, self._on_export_process_complete, self._on_export_process_error ) except OSError as e: logger.error("Export failed") logger.debug(f"Export failed: {e}") self.export_state_changed.emit(ExportStatus.ERROR_EXPORT) # ExportStatus.ERROR_MISSING_FILES except ExportError as err: if err.status: logger.error("Export failed while creating archive") self.export_state_changed.emit(err.status) else: logger.error("Export failed while creating archive (no status supplied)") self.export_state_changed.emit(ExportStatus.ERROR_EXPORT) def _run_qrexec_export( self, archive_path: str, success_callback: Callable, error_callback: Callable ) -> None: """ Send the archive to the Export VM, where the archive will be processed. Uses qrexec-client-vm (via QProcess). Results are emitted via the `finished` signal; errors are reported via `onError`. User-defined callback functions must be connected to those signals. Args: archive_path (str): The path to the archive to be processed. success_callback, err_callback: Callback functions to connect to the success and error signals of QProcess. They are included to accommodate the print functions, which still use separate signals for print preflight, print, and error states, but can be removed in favour of a generic success callback and error callback when the print code is updated. Any callbacks must call _cleanup_tmpdir() to remove the temporary directory that held the files to be exported. """ # There are already talks of switching to a QVM-RPC implementation for unlocking devices # and exporting files, so it's important to remember to shell-escape what we pass to the # shell, even if for the time being we're already protected against shell injection via # Python's implementation of subprocess, see # https://docs.python.org/3/library/subprocess.html#security-considerations qrexec = "/usr/bin/qrexec-client-vm" args = [ quote("--"), quote("sd-devices"), quote("qubes.OpenInVM"), quote("/usr/lib/qubes/qopen-in-vm"), quote("--view-only"), quote("--"), quote(archive_path), ] self.process = QProcess() self.process.finished.connect(success_callback) self.process.errorOccurred.connect(error_callback) self.process.start(qrexec, args) def _cleanup_tmpdir(self) -> None: """ Should be called in all qrexec completion callbacks. """ if self.tmpdir and os.path.exists(self.tmpdir): shutil.rmtree(self.tmpdir) def _parse_status_from_qprocess(self, process: QProcess | None) -> ExportStatus: """ Parse stderr and for ExportStatus and return status or raise ExportError. `securedrop-export` writes to stderr, which is passed to the calling qProcess using qrexec. """ if process: output_untrusted = process.readAllStandardError().data().decode("utf-8").strip() if output_untrusted: logger.debug(f"Result is {output_untrusted}") try: # The final line of stderr is the status. status_string_untrusted = output_untrusted.split("\n")[-1] return ExportStatus(status_string_untrusted) except ValueError as e: logger.debug(f"Export preflight returned unexpected value: {e}") raise ExportError(ExportStatus.UNEXPECTED_RETURN_STATUS) logger.error("Empty status result from QProcess") raise ExportError(ExportStatus.UNEXPECTED_RETURN_STATUS) def _on_export_process_complete(self) -> None: """ Callback, handle and emit results from QProcess. Information can be read from stdout/err. This callback will be triggered if the QProcess exits with return code 0. """ self._cleanup_tmpdir() try: status = self._parse_status_from_qprocess(self.process) self.export_state_changed.emit(status) except ExportError: logger.error("Export preflight returned unexpected value") self.export_state_changed.emit(ExportStatus.UNEXPECTED_RETURN_STATUS) def _on_export_process_error(self) -> None: """ Callback, called if QProcess cannot complete export. As with all such, the method signature cannot change. """ self._cleanup_tmpdir() if self.process: err = self.process.readAllStandardError().data().decode("utf-8").strip() logger.error(f"Export process error: {err}") self.export_state_changed.emit(ExportStatus.CALLED_PROCESS_ERROR) def _on_print_preflight_complete(self) -> None: """ Print preflight completion callback. """ self._cleanup_tmpdir() status = None try: status = self._parse_status_from_qprocess(self.process) # We aren't using export_state_changed yet for print, so # parse the status and send either the `failed` or `succeeded` signal except ExportError: logger.error("Export preflight returned unexpected value") self.print_preflight_check_failed.emit(ExportStatus.UNEXPECTED_RETURN_STATUS) return if status == ExportStatus.PRINT_PREFLIGHT_SUCCESS: logger.debug("Print preflight success") self.print_preflight_check_succeeded.emit(status) else: logger.debug(f"Print preflight failure ({status.value})") self.print_preflight_check_failed.emit(status) def _on_print_prefight_error(self) -> None: """ Print Preflight error callback. Occurs when the QProcess itself fails. """ self._cleanup_tmpdir() if self.process: err = self.process.readAllStandardError().data().decode("utf-8").strip() logger.debug(f"Print preflight error: {err}") self.print_preflight_check_failed.emit(ExportStatus.CALLED_PROCESS_ERROR) def _on_print_complete(self) -> None: """ Read output from QProcess and parse status, then emit status. """ self._cleanup_tmpdir() status = None try: status = self._parse_status_from_qprocess(self.process) except ExportError: logger.error("Export preflight returned unexpected value") self.print_preflight_check_failed.emit(ExportStatus.UNEXPECTED_RETURN_STATUS) return if status == ExportStatus.PRINT_SUCCESS: logger.debug("Print success") self.print_succeeded.emit(ExportStatus.PRINT_SUCCESS) else: logger.info(f"Problem printing: {status.value}") self.print_failed.emit(status) def end_process(self) -> None: """ Tell QProcess to quit if it hasn't already. Connected to the ExportWizard's `finished` signal, which fires when the dialog is closed, cancelled, or finished. """ self._cleanup_tmpdir() logger.debug("Terminate process") if self.process is not None and not self.process.waitForFinished(50): self.process.terminate() def _on_print_error(self) -> None: """ Error callback for print qrexec. Called if QProcess fails. """ self._cleanup_tmpdir() if self.process: err = self.process.readAllStandardError() logger.debug(f"Print error: {err}") else: logger.error("Print error (stderr unavailable)") self.print_failed.emit(ExportStatus.CALLED_PROCESS_ERROR) def print(self, filepaths: list[str]) -> None: """ Bundle files at filepaths into tarball and send for printing via qrexec. """ try: logger.debug("Beginning print") self.tmpdir = mkdtemp() os.chmod(self.tmpdir, 0o700) archive_path = self._create_archive( archive_dir=self.tmpdir, archive_fn=self._PRINT_FN, metadata=self._PRINT_METADATA, filepaths=filepaths, ) self._run_qrexec_export(archive_path, self._on_print_complete, self._on_print_error) except OSError as e: logger.error("Print failed") logger.debug(f"Print failed: {e}") self.print_failed.emit(ExportStatus.ERROR_PRINT) # ExportStatus.ERROR_MISSING_FILES except ExportError as err: if err.status: logger.error("Print failed while creating archive") self.print_failed.emit(err.status) else: logger.error("Print failed while creating archive (no status supplied)") self.print_failed.emit(ExportStatus.ERROR_PRINT) def _create_archive( self, archive_dir: str, archive_fn: str, metadata: dict, filepaths: list[str] = [], whistleflow: bool = False, ) -> str: """ Create the archive to be sent to the Export VM. Args: archive_dir (str): The path to the directory in which to create the archive. archive_fn (str): The name of the archive file. metadata (dict): The dictionary containing metadata to add to the archive. filepaths (list[str]): The list of files to add to the archive. whistleflow (bool): Indicates if this is a whistleflow export Returns: str: The path to newly-created archive file. """ archive_path = os.path.join(archive_dir, archive_fn) with tarfile.open(archive_path, "w:gz") as archive: self._add_virtual_file_to_archive(archive, self._METADATA_FN, metadata) # When more than one file is added to the archive, # extra care must be taken to prevent name collisions. is_one_of_multiple_files = len(filepaths) > 1 missing_count = 0 # If this is an export to Whistleflow, we want to create a directory # called source_name even if it only contains a single file. # whistleflow-view relies on this directory to pre-populate the # source name field in the send-to-giant form. prevent_name_collisions = is_one_of_multiple_files or whistleflow for filepath in filepaths: if not (os.path.exists(filepath)): missing_count += 1 logger.debug( f"'{filepath}' does not exist, and will not be included in archive" ) # Controller checks files and keeps a reference open during export, # so this shouldn't be reachable logger.warning("File not found at specified filepath, skipping") else: self._add_file_to_archive(archive, filepath, prevent_name_collisions) if missing_count == len(filepaths) and missing_count > 0: # Context manager will delete archive even if an exception occurs # since the archive is in a TemporaryDirectory logger.error("Files were moved or missing") raise ExportError(ExportStatus.ERROR_MISSING_FILES) return archive_path def _add_virtual_file_to_archive( self, archive: tarfile.TarFile, filename: str, filedata: dict ) -> None: """ Add filedata to a stream of in-memory bytes and add these bytes to the archive. Args: archive (TarFile): The archive object to add the virtual file to. filename (str): The name of the virtual file. filedata (dict): The data to add to the bytes stream. """ filedata_string = json.dumps(filedata) filedata_bytes = BytesIO(filedata_string.encode("utf-8")) tarinfo = tarfile.TarInfo(filename) tarinfo.size = len(filedata_string) archive.addfile(tarinfo, filedata_bytes) def _add_file_to_archive( self, archive: tarfile.TarFile, filepath: str, prevent_name_collisions: bool = False ) -> None: """ Add the file to the archive. When the archive is extracted, the file should exist in a directory called "export_data". Args: archive: The archive object ot add the file to. filepath: The path to the file that will be added to the supplied archive. """ filename = os.path.basename(filepath) arcname = os.path.join(self._DISK_EXPORT_DIR, filename) if prevent_name_collisions: (parent_path, _) = os.path.split(filepath) grand_parent_path, parent_name = os.path.split(parent_path) grand_parent_name = os.path.split(grand_parent_path)[1] arcname = os.path.join("export_data", grand_parent_name, parent_name, filename) if filename == "transcript.txt": arcname = os.path.join("export_data", parent_name, filename) archive.add(filepath, arcname=arcname, recursive=False) # below whistleflow functions rescued from the old export file def _export_archive_to_whistleflow(cls, archive_path: str) -> ExportStatus | None: """ Clone of _export_archive which sends the archive to the Whistleflow VM. """ try: output = subprocess.check_output( [ quote("qrexec-client-vm"), quote("--"), quote("whistleflow-view"), quote("qubes.Filecopy"), quote("/usr/lib/qubes/qfile-agent"), quote(archive_path), ], stderr=subprocess.STDOUT, ) result = output.decode("utf-8").strip() # No status is returned for successful `disk`, `printer-test`, and `print` calls. # This will change in a future release of sd-export. if result: return ExportStatus(result) else: return None except ValueError as e: logger.debug(f"Export subprocess returned unexpected value: {e}") raise ExportError(ExportStatus.UNEXPECTED_RETURN_STATUS) except subprocess.CalledProcessError as e: logger.error("Subprocess failed") logger.debug(f"Subprocess failed: {e}") raise ExportError(ExportStatus.CALLED_PROCESS_ERROR) def send_files_to_whistleflow(self, filename: str, filepaths: list[str]) -> None: """ Clone of send_file_to_usb_device, but for Whistleflow. """ with TemporaryDirectory() as temp_dir: try: logger.debug("beginning export") self._run_whistleflow_export(temp_dir, filename, filepaths) self.whistleflow_call_success.emit() logger.debug("Export successful") except ExportError as e: logger.error("Export failed") logger.debug(f"Export failed: {e}") self.whistleflow_call_failure.emit(e) self.export_completed.emit(filepaths) def _run_whistleflow_view_test(self) -> None: # TODO fill this in logger.info("Running dummy whistleflow view test") def _run_whistleflow_export( self, archive_dir: str, filename: str, filepaths: list[str] ) -> None: """ Run disk-test. Args: archive_dir (str): The path to the directory in which to create the archive. Raises: ExportError: Raised if the usb-test does not return a DISK_ENCRYPTED status. """ metadata = {} # type: Dict[str, str] archive_path = self._create_archive( archive_dir, filename, metadata, filepaths, whistleflow=True ) status = self._export_archive_to_whistleflow(archive_path) if status: raise ExportError(status) def run_whistleflow_preflight_checks(self) -> None: """ Run dummy preflight test """ try: logger.debug("beginning whistleflow preflight checks") self._run_whistleflow_view_test() logger.debug("completed preflight checks: success") self.whistleflow_preflight_check_succeeded.emit() except ExportError as e: logger.debug("completed preflight checks: failure") self.whistleflow_preflight_check_failed.emit(e) class ExportDestination(Enum): USB = "USB" WHISTLEFLOW = "WHISTLEFLOW"