export/securedrop_export/disk/cli.py (328 lines of code) (raw):

import json import logging import os import subprocess import time from re import Pattern from shlex import quote import pexpect from securedrop_export.exceptions import ExportException from .status import Status from .volume import EncryptionScheme, MountedVolume, Volume logger = logging.getLogger(__name__) _DEVMAPPER_PREFIX = "/dev/mapper/" _DEV_PREFIX = "/dev/" _UDISKS_PREFIX = ( "MODEL REVISION SERIAL DEVICE\n" "--------------------------------------------------------------------------\n" ) # pexpect allows for a complex type to be passed to `expect` in order to match with input # that includes regular expressions, byte or string patterns, *or* pexpect.EOF and pexpect.TIMEOUT, # but mypy needs a little help with it, so the below alias is used as a typehint. # See https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect PexpectList = list[ Pattern[str] | Pattern[bytes] | str | bytes | type[pexpect.EOF] | type[pexpect.TIMEOUT] ] class CLI: """ A Python wrapper for shell commands required to detect, map, and mount USB devices. CLI callers must handle ExportException. """ def get_volume(self) -> Volume | MountedVolume: """ Search for valid connected device. Raise ExportException on error. """ logger.info("Checking connected volumes") try: usbs = ( subprocess.check_output(["udisksctl", "status"]) .decode("utf-8") .removeprefix(_UDISKS_PREFIX) .strip() .split("\n") ) # Collect a space-separated list of USB device names. # Format: # Label (may contain spaces) Revision Serial# Device # The last string is the device identifier (/dev/{device}). targets = [] for i in usbs: item = i.strip().split() if len(item) > 0: targets.append(item[-1]) if len(targets) == 0: logger.info("No USB devices found") raise ExportException(sdstatus=Status.NO_DEVICE_DETECTED) elif len(targets) > 1: logger.error("Too many USB devices! Detach a device before continuing.") raise ExportException(sdstatus=Status.MULTI_DEVICE_DETECTED) # lsblk -o NAME,RM,RO,TYPE,MOUNTPOINT,FSTYPE --json # devices such as /dev/xvda are marked as "removable", # which is why we do the previous check to pick a device # recognized by udisks2 lsblk = subprocess.check_output( [ "lsblk", "--output", "NAME,RO,TYPE,MOUNTPOINT,FSTYPE", "--json", ] ).decode("utf-8") lsblk_json = json.loads(lsblk) if not lsblk_json.get("blockdevices"): logger.error("Unrecoverable: could not parse lsblk.") raise ExportException(sdstatus=Status.DEVICE_ERROR) # mypy complains that this is a list[str], but it is a # list[Union[Volume, MountedVolume]] volumes = [] # type: ignore for device in lsblk_json.get("blockdevices"): if device.get("name") in targets and device.get("ro") is False: logger.debug( f"Checking removable, writable device {_DEV_PREFIX}{device.get('name')}" ) # Inspect partitions or whole volume. # For sanity, we will only support encrypted partitions one level deep. if "children" in device: for child in device.get("children"): # Whole block device is encrypted (and unlocked) if child.get("type") == "crypt" and device.get("type") == "disk": logger.debug("Checking device {device}") item = self._get_supported_volume(device) # type: ignore if item: volumes.append(item) else: # /dev/sdX1, /dev/sdX2 logger.debug("Checking partition {child}") item = self._get_supported_volume(child) # type: ignore if item: volumes.append(item) # type: ignore # /dev/sdX and it's locked else: item = self._get_supported_volume(device) # type: ignore if item: volumes.append(item) # type: ignore if len(volumes) != 1: logger.error(f"Need one target, got {len(volumes)}") raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED) else: logger.debug(f"Export target is {volumes[0].device_name}") # type: ignore return volumes[0] # type: ignore except json.JSONDecodeError as err: logger.error(err) raise ExportException(sdstatus=Status.DEVICE_ERROR) from err except subprocess.CalledProcessError as ex: raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex def _get_supported_volume(self, device: dict) -> Volume | MountedVolume | None: """ Given JSON-formatted lsblk output for one device, determine if it is suitably partitioned and return it to be used for export, mounting it if possible. Supported volumes: * Unlocked Veracrypt drives * Locked or unlocked LUKS drives * No more than one encrypted partition (multiple non-encrypted partitions are OK as they will be ignored). Note: It would be possible to support other unlocked encrypted drives, as long as udisks2 can tell they contain an encrypted partition. """ device_name = device.get("name") device_fstype = device.get("fstype") vol = Volume(f"{_DEV_PREFIX}{device_name}", EncryptionScheme.UNKNOWN) if device_fstype == "crypto_LUKS": logger.debug(f"{device_name} is LUKS-encrypted") vol.encryption = EncryptionScheme.LUKS children = device.get("children") if children: if len(children) != 1: logger.error(f"Unexpected volume format on {vol.device_name}") return None elif children[0].get("type") != "crypt": return None else: # It's an unlocked drive, possibly mounted mapped_name = f"{_DEVMAPPER_PREFIX}{children[0].get('name')}" # Unlocked VC/TC drives will still have EncryptionScheme.UNKNOWN; # see if we can do better if vol.encryption == EncryptionScheme.UNKNOWN: vol.encryption = self._is_it_veracrypt(vol) # To opportunistically mount any unlocked encrypted partition # (i.e. third-party devices such as IronKeys), remove this condition. if vol.encryption in (EncryptionScheme.LUKS, EncryptionScheme.VERACRYPT): logger.debug(f"{vol.device_name} encryption scheme is supported") if children[0].get("mountpoint"): logger.debug(f"{vol.device_name} is mounted") return MountedVolume( device_name=vol.device_name, unlocked_name=mapped_name, encryption=vol.encryption, mountpoint=children[0].get("mountpoint"), ) else: logger.debug(f"{device_name} is unlocked but unmounted; attempting mount") return self._mount_volume(vol, mapped_name) # Locked VeraCrypt drives are rejected here (EncryptionScheme.UNKNOWN) if vol.encryption in (EncryptionScheme.LUKS, EncryptionScheme.VERACRYPT): logger.debug(f"{vol.device_name} is supported export target") return vol else: logger.debug(f"No suitable volume found on {vol.device_name}") return None def _is_it_veracrypt(self, volume: Volume) -> EncryptionScheme: """ Helper. Best-effort detection of unlocked VeraCrypt drives. udisks2 requires the flag file /etc/udisks2/tcrypt.conf to enable VeraCrypt drive detection, which we ship with this package. """ try: logger.debug(f"Check if {volume.device_name} is an unlocked VeraCrypt device") info = subprocess.check_output( [ "udisksctl", "info", "--block-device", quote(volume.device_name), ] ).decode("utf-8") if "IdType: crypto_TCRYPT\n" in info: return EncryptionScheme.VERACRYPT elif "IdType: crypto_LUKS\n" in info: # Don't downgrade LUKS to UNKNOWN if someone # calls this method on a LUKS drive return EncryptionScheme.LUKS else: return EncryptionScheme.UNKNOWN except subprocess.CalledProcessError as err: logger.debug(f"Error checking disk info of {volume.device_name}") logger.error(err) # Not a showstopper return EncryptionScheme.UNKNOWN def unlock_volume(self, volume: Volume, encryption_key: str) -> MountedVolume: """ Unlock and mount an encrypted volume. If volume is already mounted, preserve existing mountpoint. Throws ExportException if errors are encountered during device unlocking. `pexpect.ExeptionPexpect` can't be try/caught, since it's not a child of BaseException, but instead, exceptions can be included in the list of results to check for. (See https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect) """ logger.debug(f"Unlocking volume {quote(volume.device_name)}") command = "udisksctl" args = ["unlock", "--block-device", quote(volume.device_name)] # pexpect allows for a match list that contains pexpect.EOF and pexpect.TIMEOUT # as well as string/regex matches: # https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect prompt: PexpectList = [ "Passphrase: ", pexpect.EOF, pexpect.TIMEOUT, ] expected: PexpectList = [ f"Unlocked {volume.device_name} as (.*)[^\r\n].", "GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Device " # string continues f"{volume.device_name} is already unlocked as (.*)[^\r\n].", "GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Error " # string continues f"unlocking {volume.device_name}: Failed to activate device: Incorrect passphrase", pexpect.EOF, pexpect.TIMEOUT, ] unlock_error = Status.ERROR_UNLOCK_GENERIC child = pexpect.spawn(command, args) index = child.expect(prompt) if index != 0: logger.error("Did not receive disk unlock prompt") raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) else: logger.debug("Passing key") child.sendline(encryption_key) index = child.expect(expected) if index in (0, 1): # Pexpect includes a re.Match object at `child.match`, but this freaks mypy out: # see https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect # We know what format the results are in dm_name = child.match.group(1).decode("utf-8").strip() # type: ignore logger.debug(f"Device is unlocked as {dm_name}") child.close() if child.exitstatus is not None and child.exitstatus not in (0, 1): logger.warning(f"pexpect: child exited with {child.exitstatus}") # dm_name format is /dev/dm-X return self._mount_volume(volume, dm_name) elif index == 2: # Still an error, but we can report more specific error to the user logger.debug("Bad volume passphrase") unlock_error = Status.ERROR_UNLOCK_LUKS # Any other index values are also an error. Clean up and raise child.close() logger.error(f"Error encountered while unlocking {volume.device_name}") raise ExportException(sdstatus=unlock_error) def _mount_volume(self, volume: Volume, full_unlocked_name: str) -> MountedVolume: """ Given an unlocked volume, mount volume in /media/user/ by udisksctl and return MountedVolume object. Unlocked name could be `/dev/mapper/$id` or `/dev/dm-X`. Raise ExportException if errors are encountered during mounting. `pexpect.ExeptionPexpect` can't be try/caught, since it's not a child of BaseException, but instead, exceptions can be included in the list of results to check for. (See https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect) """ info_cmd = "udisksctl" info_args = ["info", "--block-device", quote(volume.device_name)] # The terminal output has colours and other formatting. A match is anything # that includes our device identified as PreferredDevice on one line # \x1b[37mPreferredDevice:\x1b[0m /dev/sdaX\r\n expected_info: PexpectList = [ f"PreferredDevice:.*[^\r\n]{volume.device_name}", "Error looking up object for device", pexpect.EOF, pexpect.TIMEOUT, ] max_retries = 3 mount_cmd = "udisksctl" mount_args = ["mount", "--block-device", quote(full_unlocked_name)] # We can't pass {full_unlocked_name} in the match statement since even if we # pass in /dev/mapper/xxx, udisks2 may refer to the disk as /dev/dm-X. expected_mount: PexpectList = [ "Mounted .* at (.*)", "Error mounting .*: GDBus.Error:org.freedesktop.UDisks2.Error.AlreadyMounted: " "Device (.*) is already mounted at `(.*)'.", "Error looking up object for device", pexpect.EOF, pexpect.TIMEOUT, ] mountpoint = None logger.debug( f"Check to make sure udisks identified {volume.device_name} " f"(unlocked as {full_unlocked_name})" ) for _ in range(max_retries): child = pexpect.spawn(info_cmd, info_args) index = child.expect(expected_info) child.close() if index != 0: logger.debug(f"udisks can't identify {volume.device_name}, retrying...") time.sleep(0.5) else: logger.debug(f"udisks found {volume.device_name}") break logger.info(f"Mount {full_unlocked_name} using udisksctl") child = pexpect.spawn(mount_cmd, mount_args) index = child.expect(expected_mount) if index == 0: # As above, we know the format. # Per https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect, # `child.match` is a re.Match object mountpoint = child.match.group(1).decode("utf-8").strip() # type: ignore logger.info(f"Successfully mounted device at {mountpoint}") elif index == 1: # Use udisks unlocked name logger.debug("Already mounted, get unlocked_name and mountpoint") full_unlocked_name = child.match.group(1).decode("utf-8").strip() # type: ignore mountpoint = child.match.group(2).decode("utf-8").strip() # type: ignore logger.info(f"Device {full_unlocked_name} already mounted at {mountpoint}") elif index == 2: logger.debug("Device is not ready") logger.debug("Close pexpect process") child.close() if mountpoint: return MountedVolume( device_name=volume.device_name, unlocked_name=full_unlocked_name, encryption=volume.encryption, mountpoint=mountpoint, ) logger.error("Could not get mountpoint") raise ExportException(sdstatus=Status.ERROR_MOUNT) def write_data_to_device( self, device: MountedVolume, archive_tmpdir: str, archive_target_dirname: str, ): """ Move files to drive (overwrites files with same filename) and unmount drive. Drive is unmounted and files are cleaned up as part of the `finally` block to ensure that cleanup happens even if export fails or only partially succeeds. """ try: # Flag to pass to cleanup method is_error = False target_path = os.path.join(device.mountpoint, archive_target_dirname) subprocess.check_call(["mkdir", target_path]) export_data = os.path.join(archive_tmpdir, "export_data/") logger.debug(f"Copying file to {archive_target_dirname}") subprocess.check_call(["cp", "-r", export_data, target_path]) logger.info(f"File copied successfully to {target_path}") subprocess.check_call(["chmod", "-R", "ugo+r", target_path]) logger.info(f"Read permissions granted on {target_path}") except (subprocess.CalledProcessError, OSError) as ex: logger.error(ex) # Ensure we report an export error out after cleanup is_error = True raise ExportException(sdstatus=Status.ERROR_EXPORT) from ex finally: self.cleanup(device, archive_tmpdir, is_error) def cleanup( self, volume: MountedVolume, archive_tmpdir: str, is_error: bool = False, should_close_volume: bool = True, ): """ Post-export cleanup method. Unmount and lock drive and remove temporary directory. Raises ExportException if errors during cleanup are encountered. Method is called whether or not export succeeds; if `is_error` is True, will report export error status on error (insted of cleanup status). """ error_status = Status.ERROR_EXPORT if is_error else Status.ERROR_EXPORT_CLEANUP logger.debug("Syncing filesystems") try: subprocess.check_call(["sync"]) self._remove_temp_directory(archive_tmpdir) # Future configurable option if should_close_volume: self._close_volume(volume) except subprocess.CalledProcessError as ex: logger.error("Error syncing filesystem") raise ExportException(sdstatus=error_status) from ex def _close_volume(self, mv: MountedVolume) -> Volume: """ Unmount and close volume. """ logger.debug(f"Unmounting drive {mv.unlocked_name} from {mv.mountpoint}") try: subprocess.check_call( [ "udisksctl", "unmount", "--block-device", quote(mv.unlocked_name), ], # Redirect stderr/stdout to avoid broken pipe when subprocess terminates, # which results in qrexec attempting to parse error lines written to stderr stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError as ex: logger.error(ex) logger.error("Error unmounting device") raise ExportException(sdstatus=Status.ERROR_UNMOUNT_VOLUME_BUSY) from ex logger.debug(f"Closing drive {mv.device_name}") try: subprocess.check_call( [ "udisksctl", "lock", "--block-device", quote(mv.device_name), ], # Redirect stderr/stdout to avoid broken pipe when subprocess terminates stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError as ex: logger.error("Error closing device") raise ExportException(sdstatus=Status.ERROR_EXPORT_CLEANUP) from ex return Volume( device_name=f"{_DEV_PREFIX}{mv.device_name}", encryption=mv.encryption, ) def _remove_temp_directory(self, tmpdir: str): """ Helper. Remove temporary directory used during export. """ logger.debug(f"Deleting temporary directory {tmpdir}") try: subprocess.check_call(["rm", "-rf", tmpdir]) except subprocess.CalledProcessError as ex: logger.error("Error removing temporary directory") raise ExportException(sdstatus=Status.ERROR_EXPORT_CLEANUP) from ex