export/securedrop_export/archive.py (56 lines of code) (raw):
#!/usr/bin/env python3
import datetime
import json
import logging
import os
import tempfile
from securedrop_export.command import Command
from securedrop_export.directory import safe_extractall
from securedrop_export.exceptions import ExportException
from securedrop_export.status import BaseStatus
logger = logging.getLogger(__name__)
class Status(BaseStatus):
ERROR_ARCHIVE_METADATA = "ERROR_ARCHIVE_METADATA"
ERROR_METADATA_PARSING = "ERROR_METADATA_PARSING"
ERROR_EXTRACTION = "ERROR_EXTRACTION"
class Metadata:
"""
Object to parse, validate and store json metadata from the sd-export archive.
"""
METADATA_FILE = "metadata.json"
def __init__(self, archive_path: str):
self.metadata_path = os.path.join(archive_path, self.METADATA_FILE)
def validate(self) -> "Metadata":
# Read metadata json and set relevant attributes
try:
with open(self.metadata_path) as f:
logger.info("Parsing archive metadata")
json_config = json.loads(f.read())
self.export_method = json_config.get("device", None)
self.encryption_key = json_config.get("encryption_key", None)
logger.info(f"Command: {self.export_method}")
except Exception as ex:
logger.error("Metadata parsing failure")
raise ExportException(sdstatus=Status.ERROR_METADATA_PARSING) from ex
# Validate action - fails if command is not in list of supported commands
try:
logger.debug("Validate export action")
self.command = Command(self.export_method)
except ValueError as v:
raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) from v
return self
class Archive:
def __init__(self, archive_path: str):
os.umask(0o077)
self.archive = archive_path
self.target_dirname = "sd-export-{}".format(
datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
)
self.tmpdir = tempfile.mkdtemp()
def extract_tarball(self) -> "Archive":
"""
Extract tarball, checking for path traversal, and return Archive object.
"""
try:
logger.info(f"Extracting tarball {self.archive} into {self.tmpdir}")
safe_extractall(self.archive, self.tmpdir)
return self
except Exception as ex:
logger.error(f"Unable to extract tarball: {ex}")
raise ExportException(sdstatus=Status.ERROR_EXTRACTION) from ex
def set_metadata(self, metadata: Metadata) -> "Archive":
"""
Set relevant metadata attributes for a given archive.
"""
self.command = metadata.command
if self.command is Command.EXPORT:
self.encryption_key = metadata.encryption_key
return self