in export/securedrop_export/disk/cli.py [0:0]
def unlock_volume(self, volume: Volume, encryption_key: str) -> MountedVolume:
"""
Unlock and mount an encrypted volume. If volume is already mounted, preserve
existing mountpoint.
Throws ExportException if errors are encountered during device unlocking.
`pexpect.ExeptionPexpect` can't be try/caught, since it's not a
child of BaseException, but instead, exceptions can be included
in the list of results to check for. (See
https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect)
"""
logger.debug(f"Unlocking volume {quote(volume.device_name)}")
command = "udisksctl"
args = ["unlock", "--block-device", quote(volume.device_name)]
# pexpect allows for a match list that contains pexpect.EOF and pexpect.TIMEOUT
# as well as string/regex matches:
# https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect
prompt: PexpectList = [
"Passphrase: ",
pexpect.EOF,
pexpect.TIMEOUT,
]
expected: PexpectList = [
f"Unlocked {volume.device_name} as (.*)[^\r\n].",
"GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Device " # string continues
f"{volume.device_name} is already unlocked as (.*)[^\r\n].",
"GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Error " # string continues
f"unlocking {volume.device_name}: Failed to activate device: Incorrect passphrase",
pexpect.EOF,
pexpect.TIMEOUT,
]
unlock_error = Status.ERROR_UNLOCK_GENERIC
child = pexpect.spawn(command, args)
index = child.expect(prompt)
if index != 0:
logger.error("Did not receive disk unlock prompt")
raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC)
else:
logger.debug("Passing key")
child.sendline(encryption_key)
index = child.expect(expected)
if index in (0, 1):
# Pexpect includes a re.Match object at `child.match`, but this freaks mypy out:
# see https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect
# We know what format the results are in
dm_name = child.match.group(1).decode("utf-8").strip() # type: ignore
logger.debug(f"Device is unlocked as {dm_name}")
child.close()
if child.exitstatus is not None and child.exitstatus not in (0, 1):
logger.warning(f"pexpect: child exited with {child.exitstatus}")
# dm_name format is /dev/dm-X
return self._mount_volume(volume, dm_name)
elif index == 2:
# Still an error, but we can report more specific error to the user
logger.debug("Bad volume passphrase")
unlock_error = Status.ERROR_UNLOCK_LUKS
# Any other index values are also an error. Clean up and raise
child.close()
logger.error(f"Error encountered while unlocking {volume.device_name}")
raise ExportException(sdstatus=unlock_error)