sdw_util/Util.py (99 lines of code) (raw):

""" Utility functions used by both the updater and notifier scripts """ import fcntl import logging import os import re import subprocess from logging.handlers import TimedRotatingFileHandler # The directory where status files and logs are stored BASE_DIRECTORY = os.path.join(os.path.expanduser("~"), ".securedrop_updater") # Directory for lock files to avoid contention or multiple instantiation. LOCK_DIRECTORY = os.path.join("/run/user", str(os.getuid())) # Folder where logs are stored LOG_DIRECTORY = os.path.join(BASE_DIRECTORY, "logs") # File that contains Qubes version information (overridden by tests) OS_RELEASE_FILE = "/etc/os-release" # Shared error string LOCK_ERROR = "Error obtaining lock on '{}'. Process may already be running." # Format for those logs LOG_FORMAT = "%(asctime)s - %(name)s:%(lineno)d(%(funcName)s) " "%(levelname)s: %(message)s" # Namespace for primary logger, additional namespaces should be defined by module user SD_LOGGER_PREFIX = "sd" sdlog = logging.getLogger(SD_LOGGER_PREFIX + "." + __name__) def configure_logging(log_file, logger_namespace=SD_LOGGER_PREFIX, backup_count=0): """ All logging related settings are set up by this function. `log_file` - the filename `logger_namespace - the prefix used for all log events by this logger `backup_count` - if nonzero, at most backup_count files will be kept """ if not os.path.exists(LOG_DIRECTORY): os.makedirs(LOG_DIRECTORY) formatter = logging.Formatter(LOG_FORMAT) handler = TimedRotatingFileHandler( os.path.join(LOG_DIRECTORY, log_file), backupCount=backup_count ) handler.setFormatter(formatter) handler.setLevel(logging.INFO) log = logging.getLogger(logger_namespace) log.setLevel(logging.INFO) log.addHandler(handler) def obtain_lock(basename): """ Obtain an exclusive lock during the execution of this process. """ lock_file = os.path.join(LOCK_DIRECTORY, basename) try: lh = open(lock_file, "w") except PermissionError: sdlog.error( f"Error writing to lock file '{lock_file}'. User may lack the required permissions." ) return None try: # Obtain an exclusive, nonblocking lock fcntl.lockf(lh, fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError: sdlog.error(LOCK_ERROR.format(lock_file)) return None return lh def can_obtain_lock(basename): """ We temporarily obtain a shared, nonblocking lock to a lockfile to determine whether the associated process is currently running. Returns True if it is safe to continue execution (no lock conflict), False if not. `basename` is the basename of a lockfile situated in the LOCK_DIRECTORY. """ lock_file = os.path.join(LOCK_DIRECTORY, basename) try: lh = open(lock_file) except FileNotFoundError: # Process may not have run during this session, safe to continue return True try: # Obtain a nonblocking, shared lock fcntl.lockf(lh, fcntl.LOCK_SH | fcntl.LOCK_NB) except OSError: sdlog.error(LOCK_ERROR.format(lock_file)) return False return True def is_conflicting_process_running(list): """ Check if any process of the given name is currently running. Aborts on the first match. """ for name in list: result = subprocess.run( args=["pgrep", name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False ) if result.returncode == 0: sdlog.error(f"Conflicting process '{name}' is currently running.") return True return False def get_qubes_version(): """ Helper function for checking the Qubes version. Returns None if not on Qubes. """ is_qubes = False version = None try: with open(OS_RELEASE_FILE) as f: for line in f: try: key, value = line.rstrip().split("=") except ValueError: continue if key == "NAME" and "qubes" in value.lower(): is_qubes = True if key == "VERSION": version = value except FileNotFoundError: return None if not is_qubes: return None return version def get_logger(prefix=SD_LOGGER_PREFIX, module=None): if module is None: return logging.getLogger(prefix) return logging.getLogger(prefix + "." + module) def strip_ansi_colors(str): """ Strip ANSI colors from command output """ return re.sub(r"\u001b\[.*?[@-~]", "", str) def is_sdapp_halted() -> bool: """ Helper fuction that returns True if `sd-app` VM is in a halted state and False if state is running, paused, or cannot be determined. Runs only if Qubes environment detected; otherwise returns False. """ if not get_qubes_version(): sdlog.error("QubesOS not detected, is_sdapp_halted will return False") return False try: output_bytes = subprocess.check_output(["qvm-ls", "sd-app"]) output_str = output_bytes.decode("utf-8") return "Halted" in output_str except subprocess.CalledProcessError as e: sdlog.error("Failed to return sd-app VM status via qvm-ls") sdlog.error(str(e)) return False