in export/securedrop_export/disk/cli.py [0:0]
def get_volume(self) -> Volume | MountedVolume:
"""
Search for valid connected device.
Raise ExportException on error.
"""
logger.info("Checking connected volumes")
try:
usbs = (
subprocess.check_output(["udisksctl", "status"])
.decode("utf-8")
.removeprefix(_UDISKS_PREFIX)
.strip()
.split("\n")
)
# Collect a space-separated list of USB device names.
# Format:
# Label (may contain spaces) Revision Serial# Device
# The last string is the device identifier (/dev/{device}).
targets = []
for i in usbs:
item = i.strip().split()
if len(item) > 0:
targets.append(item[-1])
if len(targets) == 0:
logger.info("No USB devices found")
raise ExportException(sdstatus=Status.NO_DEVICE_DETECTED)
elif len(targets) > 1:
logger.error("Too many USB devices! Detach a device before continuing.")
raise ExportException(sdstatus=Status.MULTI_DEVICE_DETECTED)
# lsblk -o NAME,RM,RO,TYPE,MOUNTPOINT,FSTYPE --json
# devices such as /dev/xvda are marked as "removable",
# which is why we do the previous check to pick a device
# recognized by udisks2
lsblk = subprocess.check_output(
[
"lsblk",
"--output",
"NAME,RO,TYPE,MOUNTPOINT,FSTYPE",
"--json",
]
).decode("utf-8")
lsblk_json = json.loads(lsblk)
if not lsblk_json.get("blockdevices"):
logger.error("Unrecoverable: could not parse lsblk.")
raise ExportException(sdstatus=Status.DEVICE_ERROR)
# mypy complains that this is a list[str], but it is a
# list[Union[Volume, MountedVolume]]
volumes = [] # type: ignore
for device in lsblk_json.get("blockdevices"):
if device.get("name") in targets and device.get("ro") is False:
logger.debug(
f"Checking removable, writable device {_DEV_PREFIX}{device.get('name')}"
)
# Inspect partitions or whole volume.
# For sanity, we will only support encrypted partitions one level deep.
if "children" in device:
for child in device.get("children"):
# Whole block device is encrypted (and unlocked)
if child.get("type") == "crypt" and device.get("type") == "disk":
logger.debug("Checking device {device}")
item = self._get_supported_volume(device) # type: ignore
if item:
volumes.append(item)
else:
# /dev/sdX1, /dev/sdX2
logger.debug("Checking partition {child}")
item = self._get_supported_volume(child) # type: ignore
if item:
volumes.append(item) # type: ignore
# /dev/sdX and it's locked
else:
item = self._get_supported_volume(device) # type: ignore
if item:
volumes.append(item) # type: ignore
if len(volumes) != 1:
logger.error(f"Need one target, got {len(volumes)}")
raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED)
else:
logger.debug(f"Export target is {volumes[0].device_name}") # type: ignore
return volumes[0] # type: ignore
except json.JSONDecodeError as err:
logger.error(err)
raise ExportException(sdstatus=Status.DEVICE_ERROR) from err
except subprocess.CalledProcessError as ex:
raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex