export/securedrop_export/print/service.py (285 lines of code) (raw):
import logging
import os
import signal
import subprocess
import time
from pathlib import Path
from securedrop_export.directory import safe_mkdir
from securedrop_export.exceptions import ExportException, TimeoutException, handler
from .status import Status
logger = logging.getLogger(__name__)
# We support viewing, but not printing, these mimetypes.
# In future we should consolidate all our mimetype
# controls into one place.
# See https://github.com/freedomofpress/securedrop-workstation/issues/842,
# https://github.com/freedomofpress/securedrop-workstation/issues/1139
MIMETYPE_UNPRINTABLE = (
"audio/",
"video/",
)
# Not unprintable (could print individual files in the archive), just not yet implemented
MIMETYPE_ARCHIVE = [
"application/vnd.djvu",
"application/vnd.rar",
"application/zip",
"application/x-7z-compressed",
]
# These are a subset of mimetypes that cups supports for direct printing:
# see /usr/share/cups/mime/mime.types
MIMETYPE_PRINT_WITHOUT_CONVERSION = [
"application/pdf",
"text/plain",
"image/gif",
"image/png",
"image/jpeg",
"image/pwg-raster",
"image/tiff",
"image/x-photocd",
"image/x-portable-anymap",
"image/x-portable-bitmap",
"image/x-portable-graymap",
"image/x-portable-pixmap",
"image/x-sgi-rgb",
"image/x-xbitmap",
"image/x-xpixmap",
"image/x-sun-raster",
"image/x-bitmap",
"image/x-icon",
]
LIBREOFFICE_DESKTOP_DIR = Path("/usr/share/applications/")
class Service:
"""
Printer service
"""
PRINTER_NAME = "sdw-printer"
PRINTER_WAIT_TIMEOUT = 60
BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv"
BRLASER_PPD = "/usr/share/cups/model/br7030.ppd"
LASERJET_DRIVER = "/usr/share/cups/drv/hpcups.drv"
LASERJET_PPD = "/usr/share/cups/model/hp-laserjet_6l.ppd"
BROTHER = "Brother"
LASERJET = "LaserJet"
SUPPORTED_PRINTERS = [BROTHER, LASERJET]
def __init__(self, submission, printer_timeout_seconds=PRINTER_WAIT_TIMEOUT):
self.submission = submission
self.printer_name = self.PRINTER_NAME
self.printer_wait_timeout = printer_timeout_seconds # Override during testing
def print(self) -> Status:
"""
Routine to print all files.
Throws ExportException if an error is encountered.
"""
logger.info("Printing all files from archive")
self._check_printer_setup()
self._print_all_files()
return Status.PRINT_SUCCESS
def printer_preflight(self) -> Status:
"""
Routine to perform preflight printer testing.
Throws ExportException if an error is encoutered.
"""
logger.info("Running printer preflight")
self._check_printer_setup()
# When client can accept new print statuses, we will return
# a success status here
return Status.PREFLIGHT_SUCCESS
def printer_test(self) -> Status:
"""
Routine to print a test page.
Throws ExportException if an error is encountered.
"""
logger.info("Printing test page")
self._check_printer_setup()
self._print_test_page()
# When client can accept new print statuses, we will return
# a success status here
return Status.PRINT_TEST_PAGE_SUCCESS
def _wait_for_print(self):
"""
Use lpstat to ensure the job was fully transfered to the printer
Return True if print was successful, otherwise throw ExportException.
Currently, the handler `handler` is defined in `exceptions.py`.
"""
signal.signal(signal.SIGALRM, handler)
signal.alarm(self.printer_wait_timeout)
printer_idle_string = f"printer {self.printer_name} is idle"
while True:
try:
logger.info(f"Running lpstat waiting for printer {self.printer_name}")
output = subprocess.check_output(["lpstat", "-p", self.printer_name])
if printer_idle_string in output.decode("utf-8"):
logger.info("Print completed")
return True
else:
time.sleep(5)
except subprocess.CalledProcessError:
raise ExportException(sdstatus=Status.ERROR_PRINT)
except TimeoutException:
logger.error(f"Timeout waiting for printer {self.printer_name}")
raise ExportException(sdstatus=Status.ERROR_PRINT)
return True
def _check_printer_setup(self) -> None:
"""
Check printer setup.
Raise ExportException if supported setup is not found.
"""
try:
logger.info("Searching for printer")
output = subprocess.check_output(["sudo", "lpinfo", "-v"])
printers = [x for x in output.decode("utf-8").split() if "usb://" in x]
if not printers:
logger.info("No usb printers connected")
raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_FOUND)
supported_printers = [
p for p in printers if any(sub in p for sub in self.SUPPORTED_PRINTERS)
]
if not supported_printers:
logger.info(f"{printers} are unsupported printers")
raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_SUPPORTED)
if len(supported_printers) > 1:
logger.info("Too many usb printers connected")
raise ExportException(sdstatus=Status.ERROR_MULTIPLE_PRINTERS_FOUND)
printer_uri = printers[0]
printer_ppd = self._install_printer_ppd(printer_uri)
self._setup_printer(printer_uri, printer_ppd)
except subprocess.CalledProcessError as e:
logger.error(e)
raise ExportException(sdstatus=Status.ERROR_UNKNOWN)
def _get_printer_uri(self) -> str:
"""
Get the URI via lpinfo. Only accept URIs of supported printers.
Raise ExportException if supported setup is not found.
"""
printer_uri = ""
try:
output = subprocess.check_output(["sudo", "lpinfo", "-v"])
except subprocess.CalledProcessError:
logger.error("Error attempting to retrieve printer uri with lpinfo")
raise ExportException(sdstatus=Status.ERROR_PRINTER_URI)
# fetch the usb printer uri
for line in output.split():
if "usb://" in line.decode("utf-8"):
printer_uri = line.decode("utf-8")
logger.info(f"lpinfo usb printer: {printer_uri}")
# verify that the printer is supported, else throw
if printer_uri == "":
# No usb printer is connected
logger.info("No usb printers connected")
raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_FOUND)
elif not any(x in printer_uri for x in self.SUPPORTED_PRINTERS):
# printer url is a make that is unsupported
logger.info(f"Printer {printer_uri} is unsupported")
raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_SUPPORTED)
logger.info(f"Printer {printer_uri} is supported")
return printer_uri
def _install_printer_ppd(self, uri):
if not any(x in uri for x in self.SUPPORTED_PRINTERS):
logger.error(f"Cannot install printer ppd for unsupported printer: {uri}")
raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_SUPPORTED)
if self.BROTHER in uri:
printer_driver = self.BRLASER_DRIVER
printer_ppd = self.BRLASER_PPD
elif self.LASERJET in uri:
printer_driver = self.LASERJET_DRIVER
printer_ppd = self.LASERJET_PPD
# Compile and install drivers that are not already installed
if not os.path.exists(printer_ppd):
logger.info("Installing printer drivers")
self.check_output_and_stderr(
command=[
"sudo",
"ppdc",
printer_driver,
"-d",
"/usr/share/cups/model/",
],
error_status=Status.ERROR_PRINTER_DRIVER_UNAVAILABLE,
ignore_stderr_startswith=b"ppdc: Warning",
)
return printer_ppd
def _setup_printer(self, printer_uri, printer_ppd):
# Add the printer using lpadmin
logger.info(f"Setting up printer {self.printer_name}")
self.check_output_and_stderr(
command=[
"sudo",
"lpadmin",
"-p",
self.printer_name,
"-E",
"-v",
printer_uri,
"-P",
printer_ppd,
"-u",
"allow:user",
],
error_status=Status.ERROR_PRINTER_INSTALL,
ignore_stderr_startswith=b"lpadmin: Printer drivers",
)
def _print_test_page(self):
logger.info("Printing test page")
testprint = Path("/usr/share/cups/data/testprint")
self._print_file(testprint)
def _print_all_files(self):
print_directory = Path(Path(self.submission.tmpdir, "export_data"))
files = os.listdir(print_directory)
for print_count, f in enumerate(files):
file_path = Path(print_directory, f)
self._print_file(file_path)
logger.info(f"Printing document {print_count + 1} of {len(files)}")
def _get_supported_mimetypes_libreoffice(self, desktop_dir: Path):
"""
Return a list of mimetypes supported by Libreoffice programs.
desktop_dir is a path, such as /usr/share/applications/, specified
by the constant LIBREOFFICE_DESKTOP_FILES.
"""
supported_mimetypes: set[str] = set()
libreoffice_programs = [
"libreoffice-base",
"libreoffice-calc",
"libreoffice-draw",
"libreoffice-impress",
"libreoffice-math",
"libreoffice-writer",
]
for item in libreoffice_programs:
desktop_file = Path(desktop_dir, f"{item}.desktop")
if desktop_file.exists():
with open(desktop_file) as f:
for line in f.readlines():
if line.startswith("MimeType="):
# Semicolon-separated list; don't leave empty element at the end
supported_mimetypes.update(line.strip("MimeType=").split(";")[:-1])
return supported_mimetypes
def _needs_pdf_conversion(self, filename: Path):
"""
Checks mimetype of a file and returns True if file must be converted
to PDF before attempting to print.
Raises ExportException for unprintable mimetypes or on mimetype
discovery error.
"""
mimetype = None
supported_types: set[str] = set()
try:
supported_types = self._get_supported_mimetypes_libreoffice(LIBREOFFICE_DESKTOP_DIR)
except OSError as e:
logger.error(f"Could not get supported mimetypes list: {e}")
raise ExportException(sdstatus=Status.ERROR_MIMETYPE_DISCOVERY)
if len(supported_types) == 0:
raise ExportException(sdstatus=Status.ERROR_MIMETYPE_DISCOVERY)
try:
# b'filename that may have spaces.docx: application/bla\n'
# use magic bytes (-M) for filetype detection
mimetype = (
subprocess.check_output(["mimetype", filename]).decode().split(":")[-1].strip()
)
except subprocess.CalledProcessError:
logger.error(f"Could not process mimetype of {filename}")
raise ExportException(sdstatus=Status.ERROR_MIMETYPE_DISCOVERY)
# Don't print "audio/*", "video/*", or archive mimetypes
if mimetype.startswith(MIMETYPE_UNPRINTABLE) or mimetype in MIMETYPE_ARCHIVE:
logger.info(f"Unprintable file {filename}")
raise ExportException(sdstatus=Status.ERROR_UNPRINTABLE_TYPE)
elif mimetype in MIMETYPE_PRINT_WITHOUT_CONVERSION:
# Print directly, no need to convert
logger.debug(f"{filename} can skip PDF conversion")
return False
elif mimetype in supported_types:
logger.debug(f"{filename} will be converted to PDF")
return True
else:
logger.error("Mimetype is unknown or unsupported.")
raise ExportException(sdstatus=Status.ERROR_MIMETYPE_UNSUPPORTED)
def _print_file(self, file_to_print: Path):
"""
Print a file, attempting to convert to a printable format (PDF)
if the mimetype is not directly printable.
file_to_print: Path representing absolute path to target file.
"""
if self._needs_pdf_conversion(file_to_print):
logger.info("Convert to pdf for printing")
# Put converted files in a subdirectory out of an abundance
# of caution. Libreoffice conversion uses a fixed name and will
# overwrite existing files of the same name. Right now we
# only send one file at a time, but if we ever batch these files
# we don't want to overwrite (eg) memo.pdf with memo.docx
safe_mkdir(file_to_print.parent, "print-pdf")
printable_folder = file_to_print.parent / "print-pdf"
# The filename is deterined by LibreOffice - it's the original stem
# (name minus extension) plus the new extension (.pdf).
converted_filename = printable_folder / (file_to_print.stem + ".pdf")
if converted_filename.exists():
logger.error("Another file by that name exists already.")
logger.debug(f"{converted_filename} would be overwritten")
raise ExportException(sdstatus=Status.ERROR_PRINT)
args: list[str | Path] = [
"libreoffice",
"--headless",
"--safe-mode",
"--convert-to",
"pdf",
"--outdir",
printable_folder,
file_to_print,
]
try:
logger.debug(f"Convert {file_to_print} to {converted_filename} for printing")
output = subprocess.check_output(args).decode()
if "Error" in output:
# Even on error, libreoffice returns 0, so we need to check
# the output, as well as check that the file exists
logger.error("Libreoffice headless conversion error")
logger.debug(output)
raise ExportException(sdstatus=Status.ERROR_PRINT)
except subprocess.CalledProcessError as e:
raise ExportException(sdstatus=Status.ERROR_PRINT, sderror=e.output)
file_to_print = converted_filename
if not file_to_print.exists():
logger.error(f"Something went wrong: {file_to_print} not found")
raise ExportException(sdstatus=Status.ERROR_PRINT)
logger.info(f"Sending file to printer {self.printer_name}")
try:
# We can switch to using libreoffice --pt $printer_cups_name
# here, and either print directly (headless) or use the GUI
subprocess.check_call(
["xpp", "-P", self.printer_name, file_to_print],
)
except subprocess.CalledProcessError as e:
raise ExportException(sdstatus=Status.ERROR_PRINT, sderror=e.output)
# This is an addition to ensure that the entire print job is transferred over.
# If the job is not fully transferred within the timeout window, the user
# will see an error message.
self._wait_for_print()
def check_output_and_stderr(
self, command: str, error_status: Status, ignore_stderr_startswith=None
):
"""
Wrap subprocess.check_output to ensure we wrap CalledProcessError and return
our own exception, and log the error messages.
"""
try:
err = subprocess.run(command, check=True, capture_output=True).stderr
# ppdc and lpadmin may emit warnings we are aware of which should not be treated as
# user facing errors
if ignore_stderr_startswith and err.startswith(ignore_stderr_startswith):
logger.info("Encountered warning: {}".format(err.decode("utf-8")))
elif err == b"":
# Nothing on stderr and returncode is 0, we're good
pass
else:
raise ExportException(sdstatus=error_status, sderror=err)
except subprocess.CalledProcessError as ex:
raise ExportException(sdstatus=error_status, sderror=ex.output)