client/securedrop_client/crypto.py (142 lines of code) (raw):
"""
Copyright (C) 2018 The Freedom of the Press Foundation.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
import os
import struct
import subprocess
import tempfile
from pathlib import Path
from sqlalchemy.orm import scoped_session
from securedrop_client.config import Config
from securedrop_client.db import Source
from securedrop_client.utils import safe_copy, safe_gzip_extract, safe_mkdir
logger = logging.getLogger(__name__)
class CryptoError(Exception):
pass
GZIP_FILE_IDENTIFICATION = b"\037\213"
GZIP_FLAG_EXTRA_FIELDS = 4 # gzip.FEXTRA
GZIP_FLAG_FILENAME = 8 # gzip.FNAME
def read_gzip_header_filename(filename: str) -> str:
"""
Extract the original filename from the header of a gzipped file.
Adapted from Python's gzip._GzipReader._read_gzip_header.
"""
original_filename = ""
with open(filename, "rb") as f:
gzip_header_identification = f.read(2)
if gzip_header_identification != GZIP_FILE_IDENTIFICATION:
raise OSError(f"Not a gzipped file ({gzip_header_identification!r})")
(gzip_header_compression_method, gzip_header_flags, _) = struct.unpack("<BBIxx", f.read(8))
if gzip_header_compression_method != 8:
raise OSError("Unknown compression method")
if gzip_header_flags & GZIP_FLAG_EXTRA_FIELDS:
(extra_len,) = struct.unpack("<H", f.read(2))
f.read(extra_len)
if gzip_header_flags & GZIP_FLAG_FILENAME:
fb = b""
while True:
s = f.read(1)
if not s or s == b"\000":
break
fb += s
original_filename = str(fb, "utf-8")
return original_filename
class GpgHelper:
# The extraction path should be the tempdir provided by the system
EXTRACTION_PATH = str(Path(tempfile.gettempdir()))
def __init__(self, sdc_home: str, session_maker: scoped_session, is_qubes: bool) -> None:
"""
:param sdc_home: Home directory for the SecureDrop client
:param is_qubes: Whether the client is running in Qubes or not
"""
safe_mkdir(sdc_home, "gpg")
self.sdc_home = sdc_home
self.is_qubes = is_qubes
self.session_maker = session_maker
config = Config.load()
self.journalist_key_fingerprint = config.journalist_key_fingerprint
def decrypt_submission_or_reply(
self, filepath: str, plaintext_filepath: str, is_doc: bool = False
) -> str:
"""
Decrypt the file located at the given filepath. If is_doc is False, store the decrypted
plaintext contents to plaintext_filepath in /tmp. Otherwise, unzip and extract the document
to the parent directory of plaintext_filepath. The document will be saved as the filename
in the gzip header if it exists otherwise the plaintext_filepath name will be used.
"""
original_filename = Path(Path(filepath).stem).stem # Remove one or two suffixes
err = tempfile.NamedTemporaryFile(suffix=".message-error", delete=False)
with tempfile.NamedTemporaryFile(suffix=".message") as out:
cmd = self._gpg_cmd_base()
cmd.extend(["--decrypt", filepath])
res = subprocess.call(cmd, stdout=out, stderr=err)
if res != 0:
# The err tempfile was created with delete=False, so needs to
# be explicitly cleaned up. We will do that after we've read the file.
err.close()
with open(err.name) as e:
msg = f"GPG Error: {e.read()}"
os.unlink(err.name)
raise CryptoError(msg)
# Delete err file
err.close()
os.unlink(err.name)
# Delete encrypted file now that it's been successfully decrypted
os.unlink(filepath)
# If is_doc is True, unzip and extract the document to the parent directory of filepath.
# The document will be saved as the filename in the gzip header, which should contain
# the name of the original file that was gzipped. If the name is not in the header, use
# the filepath name.
#
# If is_doc is False, store the decrypted plaintext contents to the plaintext_filepath
# in /tmp that will automatically be deleted after decryption because it is a named
# temporary file.
if is_doc:
original_filename = read_gzip_header_filename(out.name) or original_filename
safe_gzip_extract(out.name, filepath, original_filename, self.sdc_home)
else:
# plaintext_filepath is a NamedTemporaryFile in /tmp so the base_dir is /tmp
safe_copy(out.name, plaintext_filepath, self.EXTRACTION_PATH)
return original_filename
def _gpg_cmd_base(self) -> list:
if self.is_qubes: # pragma: no cover
cmd = ["qubes-gpg-client"]
else:
cmd = ["gpg", "--homedir", os.path.join(self.sdc_home, "gpg")]
cmd.extend(["--trust-model", "always"])
return cmd
def import_key(self, source: Source) -> None:
"""
Imports a Source's GPG key.
"""
logger.debug("Importing key for source %s", source.uuid)
if not source.public_key:
raise CryptoError(f"Could not import key: source {source.uuid} has no key")
self._import(source.public_key)
def _import(self, key_data: str) -> None:
"""Imports a key to the client GnuPG keyring."""
with (
tempfile.NamedTemporaryFile("w+") as temp_key,
tempfile.NamedTemporaryFile("w+") as stdout,
tempfile.NamedTemporaryFile("w+") as stderr,
):
temp_key.write(key_data)
temp_key.seek(0)
if self.is_qubes: # pragma: no cover
cmd = ["qubes-gpg-import-key", temp_key.name]
else:
cmd = self._gpg_cmd_base()
cmd.extend(
["--import-options", "import-show", "--with-colons", "--import", temp_key.name]
)
try:
subprocess.check_call(cmd, stdout=stdout, stderr=stderr)
except subprocess.CalledProcessError as e:
stderr.seek(0)
raise CryptoError(f"Could not import key: {e}\n{stderr.read()}")
def encrypt_to_source(self, source_uuid: str, data: str) -> str:
"""
:param data: A string of data to encrypt to a source.
"""
session = self.session_maker()
source = session.query(Source).filter_by(uuid=source_uuid).one()
# do not attempt to encrypt if the journalist key is missing
if not self.journalist_key_fingerprint:
raise CryptoError("Could not encrypt reply due to missing fingerprint for journalist")
# do not attempt to encrypt if the source key is missing
if not (source.fingerprint and source.public_key):
raise CryptoError(f"Could not encrypt reply: no key for source {source_uuid}")
try:
self.import_key(source)
except CryptoError as e:
raise CryptoError("Could not import key before encrypting reply: {e}") from e
cmd = self._gpg_cmd_base()
with (
tempfile.NamedTemporaryFile("w+") as content,
tempfile.NamedTemporaryFile("w+") as stdout,
tempfile.NamedTemporaryFile("w+") as stderr,
):
content.write(data)
content.seek(0)
cmd.extend(
[
"--encrypt",
"-r",
source.fingerprint,
"-r",
self.journalist_key_fingerprint,
"--armor",
]
)
if not self.is_qubes:
# In Qubes, the ciphertext will go to stdout.
# In addition the option below cannot be passed
# through the gpg client wrapper.
cmd.extend(["-o-"]) # write to stdout
cmd.extend([content.name])
try:
subprocess.check_call(cmd, stdout=stdout, stderr=stderr)
except subprocess.CalledProcessError as e:
stderr.seek(0)
err = stderr.read()
raise CryptoError(f"Could not encrypt to source {source_uuid}: {e}\n{err}")
stdout.seek(0)
return stdout.read()