files/sdw-admin.py (156 lines of code) (raw):
#!/usr/bin/python3
"""
Admin wrapper script for applying salt states for staging and prod scenarios. The rpm
packages only puts the files in place `/srv/salt` but does not apply the state, nor
does it handle the config.
"""
import argparse
import os
import subprocess
import sys
from typing import List
import qubesadmin
SCRIPTS_PATH = "/usr/share/securedrop-workstation-dom0-config/"
SALT_PATH = "/srv/salt/securedrop_salt/"
BASE_TEMPLATE = "debian-12-minimal"
sys.path.insert(1, os.path.join(SCRIPTS_PATH, "scripts/"))
from validate_config import SDWConfigValidator, ValidationError # noqa: E402
DEBIAN_VERSION = "bookworm"
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--apply",
default=False,
required=False,
action="store_true",
help="Apply workstation configuration with Salt",
)
parser.add_argument(
"--validate",
default=False,
required=False,
action="store_true",
help="Validate the configuration",
)
parser.add_argument(
"--uninstall",
default=False,
required=False,
action="store_true",
help="Completely Uninstalls the SecureDrop Workstation",
)
parser.add_argument(
"--force",
default=False,
required=False,
action="store_true",
help="During uninstall action, don't prompt for confirmation, proceed immediately",
)
return parser.parse_args()
def install_pvh_support():
"""
Installs grub2-xen-pvh in dom0 - required for PVH with AppVM local kernels
TODO: install this via package requirements instead if possible
"""
try:
subprocess.check_call(["sudo", "qubes-dom0-update", "-y", "-q", "grub2-xen-pvh"])
except subprocess.CalledProcessError:
raise SDWAdminException("Error installing grub2-xen-pvh: local PVH not available.")
def copy_config():
"""
Copies config.json and sd-journalist.sec to /srv/salt/securedrop_salt
"""
try:
subprocess.check_call(["sudo", "cp", os.path.join(SCRIPTS_PATH, "config.json"), SALT_PATH])
subprocess.check_call(
["sudo", "cp", os.path.join(SCRIPTS_PATH, "sd-journalist.sec"), SALT_PATH]
)
except subprocess.CalledProcessError:
raise SDWAdminException("Error copying configuration")
def provision_all():
"""
Runs provision-all to apply the salt state.highstate on dom0 and all VMs
"""
try:
subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/provision-all")])
except subprocess.CalledProcessError:
raise SDWAdminException("Error during provision-all")
print("Provisioning complete. Please reboot to complete the installation.")
def validate_config(path):
"""
Calls the validate_config script to validate the config present in the staging/prod directory
"""
try:
validator = SDWConfigValidator(path) # noqa: F841
except ValidationError:
raise SDWAdminException("Error while validating configuration")
def get_appvms_for_template(vm_name: str) -> List[str]:
"""
Return a list of AppVMs that use the specified VM as a template
"""
app = qubesadmin.Qubes()
try:
template_vm = app.domains[vm_name]
except KeyError:
# No VM implies no appvms, return an empty list
# (The template may just not be installed yet)
return []
return [x.name for x in list(template_vm.appvms)]
def refresh_salt():
"""
Cleans the Salt cache and synchronizes Salt to ensure we are applying states
from the currently installed version
"""
try:
subprocess.check_call(["sudo", "rm", "-rf", "/var/cache/salt"])
except subprocess.CalledProcessError:
raise SDWAdminException("Error while clearing Salt cache")
try:
subprocess.check_call(["sudo", "qubesctl", "saltutil.sync_all", "refresh=true"])
except subprocess.CalledProcessError:
raise SDWAdminException("Error while synchronizing Salt")
def perform_uninstall():
try:
subprocess.check_call(
["sudo", "qubesctl", "state.sls", "securedrop_salt.sd-clean-default-dispvm"]
)
print("Destroying all VMs")
subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/destroy-vm"), "--all"])
print("Reverting dom0 configuration")
subprocess.check_call(["sudo", "qubesctl", "state.sls", "securedrop_salt.sd-clean-all"])
subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/clean-salt")])
print("Uninstalling dom0 config package")
subprocess.check_call(
["sudo", "dnf", "-y", "-q", "remove", "securedrop-workstation-dom0-config"]
)
except subprocess.CalledProcessError:
raise SDWAdminException("Error during uninstall")
print(
"Instance secrets (Journalist Interface token and Submission private key) are still "
"present on disk. You can delete them in /usr/share/securedrop-workstation-dom0-config"
)
def main():
if os.geteuid() == 0:
print("Please do not run this script as root.")
sys.exit(0)
args = parse_args()
if args.validate:
print("Validating...", end="")
validate_config(SCRIPTS_PATH)
print("OK")
elif args.apply:
print(
"SecureDrop Workstation should be installed on a fresh Qubes OS install.\n"
"The installation process will overwrite any user modifications to the\n"
f"{BASE_TEMPLATE} TemplateVM, and will disable old-format qubes-rpc\n"
"policy directives.\n"
)
affected_appvms = get_appvms_for_template(BASE_TEMPLATE)
if len(affected_appvms) > 0:
print(
f"{BASE_TEMPLATE} is already in use by the following AppVMS:\n"
f"{affected_appvms}\n"
"Applications and configurations in use by these AppVMs will be\n"
f"removed from {BASE_TEMPLATE}."
)
response = input("Are you sure you want to proceed (y/N)? ")
if response.lower() != "y":
print("Exiting.")
sys.exit(0)
print("Applying configuration...")
validate_config(SCRIPTS_PATH)
install_pvh_support()
copy_config()
refresh_salt()
provision_all()
elif args.uninstall:
print(
"Uninstalling will remove all packages and destroy all VMs associated\n"
"with SecureDrop Workstation. It will also remove all SecureDrop tags\n"
"from other VMs on the system."
)
if not args.force:
response = input("Are you sure you want to uninstall (y/N)? ")
if response.lower() != "y":
print("Exiting.")
sys.exit(0)
refresh_salt()
perform_uninstall()
else:
sys.exit(0)
class SDWAdminException(Exception):
pass
if __name__ == "__main__":
main()