privaterelay/management/commands/cleanup_data.py (248 lines of code) (raw):

from __future__ import annotations import logging import textwrap from argparse import RawDescriptionHelpFormatter from shutil import get_terminal_size from typing import TYPE_CHECKING, Any from django.core.management.base import BaseCommand, DjangoHelpFormatter from django.db import transaction from codetiming import Timer from emails.cleaners import MissingProfileCleaner, ServerStorageCleaner from privaterelay.cleaners import MissingEmailCleaner if TYPE_CHECKING: # pragma: no cover from argparse import ArgumentParser from privaterelay.cleaner_task import DataIssueTask logger = logging.getLogger("eventsinfo.cleanup_data") class RawDescriptionDjangoHelpFormatter( DjangoHelpFormatter, RawDescriptionHelpFormatter ): """DjangoHelpFormatter, but don't reflow the epilog.""" class Command(BaseCommand): help = "Detects and optionally clean data issues." task_list: list[type[DataIssueTask]] = [ ServerStorageCleaner, MissingProfileCleaner, MissingEmailCleaner, ] tasks: dict[str, DataIssueTask] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) tasks = {Task.slug: Task() for Task in self.task_list} self.tasks = {slug: tasks[slug] for slug in sorted(tasks.keys())} def create_parser(self, prog_name, subcommand, **kwargs): """Add DataIssueTask information to default parser.""" epilog_lines = ["Data issue detectors / cleaners:"] for slug, task in self.tasks.items(): raw = f"{slug} - {task.title}" epilog_lines.extend( textwrap.wrap( raw, width=get_terminal_size().columns, initial_indent=" ", subsequent_indent=" ", ) ) epilog = "\n".join(epilog_lines) parser = super().create_parser(prog_name, subcommand, epilog=epilog, **kwargs) if not parser.formatter_class == DjangoHelpFormatter: raise TypeError("parser.formatter_class must be type DjangoHelpFormatter") parser.formatter_class = RawDescriptionDjangoHelpFormatter return parser def add_arguments(self, parser: ArgumentParser) -> None: parser.add_argument( "--clean", action="store_true", help="Clean detected data issues." ) for slug, task in self.tasks.items(): parser.add_argument( f"--{slug}", action="store_true", help=f"Only run {slug} {'cleaner' if task.can_clean else 'detector'}", ) def handle(self, clean: bool, verbosity: int, *args: Any, **kwargs: Any) -> str: """Run the cleanup_data command.""" # Determine if we're running some tasks or the full set run_some: list[str] = [] for name, val in kwargs.items(): slug = name.replace("_", "-") if slug in self.tasks and val: run_some.append(slug) if run_some: tasks = {slug: self.tasks[slug] for slug in sorted(run_some)} else: tasks = self.tasks # Find data issues and clean them if requested issue_count = 0 clean_count = 0 issue_timers: dict[str, float] = {} clean_timers: dict[str, float] = {} for slug, task in tasks.items(): with transaction.atomic(): task_issues, task_time = self.find_issues_in_task(task) issue_count += task_issues issue_timers[slug] = task_time if clean: task_cleaned, clean_time = self.clean_issues_in_task(task) if task_cleaned is not None and clean_time is not None: clean_count += task_cleaned clean_timers[slug] = clean_time # Log results and create a report, based on requested verbosity if clean: log_message = ( f"cleanup_data complete, cleaned {clean_count} of" f" {issue_count} issue{'' if issue_count==1 else 's'}." ) else: log_message = ( f"cleanup_data complete, found {issue_count}" f" issue{'' if issue_count==1 else 's'} (dry run)." ) full_data, log_data = self.prepare_data( clean, verbosity, tasks, issue_timers, clean_timers=clean_timers ) logger.info(log_message, extra=log_data) report = self.get_report(log_message, full_data) return report def find_issues_in_task(self, task: DataIssueTask) -> tuple[int, float]: """Run a task, returning the number of issues and execution time.""" with Timer(logger=None) as issue_timer: total = task.issues() return total, round(issue_timer.last, 3) def clean_issues_in_task( self, task: DataIssueTask ) -> tuple[int, float] | tuple[None, None]: """Run the task cleaner, returning issues cleaned and execution time.""" if not hasattr(task, "clean"): return (None, None) with Timer(logger=None) as clean_timer: cleaned = task.clean() return cleaned, round(clean_timer.last, 3) def prepare_data( self, cleaned: bool, verbosity: int, tasks: dict[str, DataIssueTask], issue_timers: dict[str, float], clean_timers: dict[str, float], ) -> tuple[dict, dict]: """Gather full data and log data from tasks and timers.""" timers = {"query_s": sum(issue_timers.values())} if clean_timers: timers["clean_s"] = sum(clean_timers.values()) log_details = {} full_details = {} for slug, task in tasks.items(): timers = {"query_s": issue_timers[slug]} if clean_timers: timers["clean_s"] = clean_timers[slug] full_details[slug] = { "title": task.title, "check_description": task.check_description, "can_clean": task.can_clean, "counts": task.counts, "markdown_report": task.markdown_report(), "timers": timers, } if verbosity > 1: log_details[slug] = {"counts": task.counts, "timers": timers} elif verbosity > 0: log_details[slug] = {"counts": {"summary": task.counts["summary"]}} full_data = { "verbosity": verbosity, "cleaned": cleaned, "timers": timers, "tasks": full_details, } log_data = { "cleaned": cleaned, "timers": timers, } if log_details: log_data["tasks"] = log_details return full_data, log_data def get_report(self, log_message: str, data: dict) -> str: """Generate a human-readable report.""" verbosity: int = data["verbosity"] cleaned: bool = data["cleaned"] tasks: dict[str, dict[str, Any]] = data["tasks"] # Collect task details details: list[str] = [] sum_all_rows: list[tuple[str, ...]] = [] totals: list[float] = [0, 0, 0.0, 0.0] for slug, task in tasks.items(): # Gather data and types summary: dict[str, int] = task["counts"]["summary"] needs_cleaning = summary["needs_cleaning"] num_cleaned = summary.get("cleaned", 0) task_timers: dict[str, float] = task["timers"] query_timer = task_timers["query_s"] clean_timer = task_timers.get("clean_s", 0.0) # Collect summary data sum_all_rows.append( ( slug, str(needs_cleaning), str(num_cleaned), format(query_timer, "0.3f"), format(clean_timer, "0.3f"), ) ) totals[0] += needs_cleaning totals[1] += num_cleaned totals[2] += query_timer totals[3] += clean_timer # Details are ommited on low verbosity if verbosity <= 1: continue # Collect detail section data detail = [f"## {slug}"] detail.extend(textwrap.wrap(task["check_description"], width=80)) detail.append("") detail.append( f"Detected {needs_cleaning} issue{'' if needs_cleaning == 1 else 's'}" f" in {query_timer} seconds." ) if cleaned: if task["can_clean"]: detail.append( f"Cleaned {num_cleaned} issue{'' if num_cleaned == 1 else 's'}" f" in {clean_timer} seconds." ) elif needs_cleaning: detail.append("Unable to automatically clean detected items.") detail.append("") detail.append(task["markdown_report"]) details.append("\n".join(detail)) report = ["", "# Summary"] if not cleaned: report.append("Detected issues only. Use --clean to fix issues.") report.append("") # Pick summary table columns if cleaned: columns = ["task", "issues", "fixed", "query (s)", "fix (s)"] sum_rows: list[tuple[str, ...]] = sum_all_rows[:] sum_rows.append( ( "_Total_", str(totals[0]), str(totals[1]), format(totals[2], "0.3f"), format(totals[3], "0.3f"), ) ) else: columns = ["task", "issues", "query (s)"] sum_rows = [(row[0], row[1], row[3]) for row in sum_all_rows] sum_rows.append(("_Total_", str(totals[0]), format(totals[2], "0.3f"))) # Determine summary table widths widths = [len(col) for col in columns] for row in sum_rows: for column, value in enumerate(row): widths[column] = max(widths[column], len(value)) # Output summary table with aligned columns header = ( "|" + "|".join(f"{col:^{widths[colnum]}}" for colnum, col in enumerate(columns)) + "|" ) sep = ( "|" + "|".join( f"{'-' * (widths[colnum] - 1)}:" for colnum, _ in enumerate(columns) ) + "|" ) report.extend([header, sep]) for row in sum_rows: row_report = ( "|" + "|".join(f"{col:>{widths[colnum]}}" for colnum, col in enumerate(row)) + "|" ) report.append(row_report) # Output details if verbosity > 1: report.extend(["", "# Details"]) for detail_block in details: report.extend([detail_block, ""]) return "\n".join(report)