privaterelay/cleaner_task.py (611 lines of code) (raw):

"""Framework for tasks that identify data issues and (if possible) clean them up""" from __future__ import annotations import string from abc import ABCMeta, abstractmethod from typing import Any, Generic, Literal, TypeVar, get_args from django.db.models import Model, Q from django.db.models.query import QuerySet Counts = dict[str, dict[str, int]] CleanupData = dict[str, Any] M = TypeVar("M", bound=Model) CLEAN_GROUP_T = Literal["ok", "needs_cleaning"] # Define allowed characters for item keys # {model_plural}.[!]{sub_name1}.[!]{sub_name2} _KEY_SEP = "." _NEGATE_PREFIX = "!" _ITEM_KEY_CHAR_SET = set( string.ascii_lowercase + string.digits + "_" + _NEGATE_PREFIX + _KEY_SEP ) # Used by CleanedItem _CLEANED_METRIC_NAME = "cleaned" def _metric_name_for_model(model: type[M]) -> str: """The model's metric key, used in metrics and as a dictionary key.""" return str(model._meta.verbose_name_plural).replace(" ", "_") class ReportItem(metaclass=ABCMeta): """ An item in a data task report. This is the base model in the reporting item hierarchy. Code should use the derived classes CleanedItem, DataModelItem, and DataItem. """ def __init__(self, metric_name: str | None = None, report_name: str | None = None): """ Initialize a ReportItem. The `metric_name` parameter sets the name of the entry when it appears as a `dict` or JSON key. The default is `None`, which omits the entry from reports. The `report_name` parameter sets the name of the entry when it appears in a report for humans. It can be omitted when `metric_name` is None. """ if metric_name and ( bad_chars := [c for c in metric_name if c not in _ITEM_KEY_CHAR_SET] ): raise ValueError( f"metric_name '{metric_name}' has disallowed character" f"{'' if len(bad_chars) == 1 else 's'} '{''.join(sorted(bad_chars))}'" ) if metric_name == "": raise ValueError("metric_name is an empty string, should be None") if metric_name is None and report_name is not None: raise ValueError(f"report_name is '{report_name}', but metric_name is None") if report_name == "": raise ValueError("report_name is an empty string, should be None") self.metric_name = metric_name self.report_name = report_name @abstractmethod def count(self) -> int: raise NotImplementedError class CleanedItem(ReportItem): """Represents the results of cleaning a Model.""" def __init__(self, count: int, report_name: str = "Cleaned") -> None: if count < 0: raise ValueError("count can not be negative") self._count = count super().__init__(metric_name=_CLEANED_METRIC_NAME, report_name=report_name) def __repr__(self) -> str: args = [str(self._count)] if self.report_name != "Cleaned": args.append(f"report_name={self.report_name!r}") return f'{type(self).__name__}({", ".join(args)})' def __eq__(self, other: Any) -> bool: if isinstance(other, CleanedItem): return ( self.__class__ == other.__class__ and self._count == other._count and self.metric_name == other.metric_name and self.report_name == other.report_name ) return NotImplemented def count(self) -> int: return self._count class BaseDataItem(ReportItem, Generic[M]): """An entry in a data task report backed by a database query.""" def __init__( self, model_or_parent: type[M] | BaseDataItem[M], filter_by: str | Q | None = None, exclude: bool = False, metric_name: str | None = None, report_name: str | None = None, ) -> None: if metric_name == _CLEANED_METRIC_NAME: raise ValueError(f"metric_name '{metric_name}' is reserved for CleanedItem") self._model_or_parent = model_or_parent self.filter_by = filter_by self.exclude = exclude super().__init__(metric_name=metric_name, report_name=report_name) def __eq__(self, other: Any) -> bool: if isinstance(other, BaseDataItem): return ( self._model_or_parent == other._model_or_parent and self.filter_by == other.filter_by and self.exclude == other.exclude and self.metric_name == other.metric_name and self.report_name == other.report_name ) return NotImplemented def get_queryset(self) -> QuerySet[M]: """Return the Django query for this BaseDataItem.""" if isinstance(self._model_or_parent, BaseDataItem): query = self._model_or_parent.get_queryset() else: query = self._model_or_parent._default_manager.all() if isinstance(self.filter_by, str): filter_by = {self.filter_by: True} if self.exclude: query = query.exclude(**filter_by) else: query = query.filter(**filter_by) elif isinstance(self.filter_by, Q): if self.exclude: query = query.exclude(self.filter_by) else: query = query.filter(self.filter_by) return query def count(self) -> int: """Return the number of rows matched for this BaseDataItem.""" return self.get_queryset().count() class DataModelItem(BaseDataItem[M]): """A BaseDataItem representing the top-level Model""" _model_or_parent: type[M] metric_name: str report_name: str filter_by: None def __init__(self, model: type[M]) -> None: """Initialize a DataModelItem.""" super().__init__( model_or_parent=model, metric_name=_metric_name_for_model(model), report_name=str(model._meta.verbose_name_plural).title(), ) def __repr__(self) -> str: return f"{type(self).__name__}({self._model_or_parent.__name__})" class DataItem(BaseDataItem[M]): """ A DataItem is a subquery of a DataModelItem or DataItem. A top-level model DataItem represents all the items in a table. A query that selects some rows is a DataItem with a parent. The specific rows of interest can be represented by multiple levels of DataItems, giving context to the specific rows. """ _model_or_parent: BaseDataItem[M] def __init__( self, parent: BaseDataItem[M], filter_by: str | Q, exclude: bool = False, metric_name: str | None = None, report_name: str | None = None, clean_group: CLEAN_GROUP_T | None = None, ) -> None: """ Initialize a DataItem, checking for init-time issues. The `filter_by` parameter sets the filter (`filter` is a Python keyword). It can be a string, which represents a boolean filter. It can be a Django Q object, such as `Q(num_deleted_relay_addresses__gt=5)`. The default is to include rows matching the query. If `exclude` is set to `True`, then the query is for rows that do not match the filter. The `clean_group` parameter identifies the DataItem as a query of interest, usually in the context of a CleanerTask. An 'ok' value means the query represents rows without a problem, and a 'needs_cleaning' value means the rows need fixing. """ if filter_by == "": raise ValueError("filter_by is an empty string, should be set") if metric_name is None and clean_group is not None: raise ValueError(f"clean_group is '{clean_group}', but metric_name is None") if clean_group is not None and clean_group not in get_args(CLEAN_GROUP_T): raise ValueError(f"clean_group has invalid value '{clean_group}'") self.clean_group = clean_group super().__init__( model_or_parent=parent, filter_by=filter_by, exclude=exclude, metric_name=metric_name, report_name=report_name, ) def __repr__(self) -> str: if isinstance(self._model_or_parent, DataItem): args = [ f"<{type(self._model_or_parent).__name__}" f"(metric_name={self._model_or_parent.metric_name!r}, ...)>" ] else: args = [repr(self._model_or_parent)] args.append(repr(self.filter_by)) if self.exclude: args.append(f"exclude={self.exclude!r}") if self.metric_name: args.append(f"metric_name={self.metric_name!r}") if self.report_name: args.append(f"report_name={self.report_name!r}") if self.clean_group: args.append(f"clean_group={self.clean_group!r}") return f'{type(self).__name__}({", ".join(args)})' class DataModelSpec(Generic[M]): """ Define queries on a Model that can identify issues. This provides a higher-level interface for constructing a hierarchy of DataItems. The top-level is the most general, and queries are filtered to focus on more specific sets of rows as you go down the hierarchy. Each DataItem is identified by a key which represents its place in the hierarchy. For example, the key for a DataItem for active users created in 2023 might be 'active.created_in_2023'. When exporting the {key: DataItem} dict with `to_data_items`, the keys are turned into fully-qualified key by prefixing them with the model key. The model key is derived from the `verbose_plural_name` of the model. For example, a fully-qualified key would be 'users.active.created_in_2023'. https://docs.djangoproject.com/en/4.2/ref/models/options/#verbose-name-plural """ def __init__( self, model: type[M], subdivisions: list[DataBisectSpec] | None = None, omit_key_prefixes: list[str] | None = None, metric_name_overrides: dict[str, str] | None = None, report_name_overrides: dict[str, str] | None = None, ok_key: str | None = None, needs_cleaning_key: str | None = None, cleaned_report_name: str = "Cleaned", ) -> None: """ Initialize a DataModelSpec, checking for init-time issues. Keyword arguments: model - The model for this hierarchy of queries subdivisions - The subqueries, as a list of DataBisectSpecs omit_key_prefixes - A list of key prefixes that should be omitted from reports. Since the key name represents the hierarchy, omitting a prefix removes a whole branch of queries and subqueries. metric_name_overrides - A dict of keys to the metric names as used in `dict` and JSON keys. The default metric name is the last component of the key. report_name_overrides - A dict of keys to the human-suitable report names. The default report name is the last component of the key, converted to a title-cased phrase. ok_key - The key for the "ok" DataItem, such as rows that have already been cleaned. needs_cleaning_key - The key for the "needs_cleaning" DataItem, such as rows that need manual or automated cleaning. cleaned_report_name - The report name for the cleaned DataItem, added after running cleaning. This defaults to "Cleaned". """ if not isinstance(model, type) or not issubclass(model, Model): raise ValueError(f"model {model!r} is not a Django model.") subkeys: set[str] = set() for sub in subdivisions or []: for key in sub.get_keys(): if key in subkeys: raise ValueError("Duplicate key 'active' in subdivisions") subkeys.add(key) if omit_key_prefixes: if "" in omit_key_prefixes: raise ValueError( "omit_key_prefixes should not include the empty string" ) for key in omit_key_prefixes: if key not in subkeys: raise ValueError( f"omit_key_prefixes key '{key}' not found in subdivision" f" keys {sorted(subkeys)}" ) for key in (metric_name_overrides or {}).keys(): if key in omit_key_prefixes: raise ValueError( f"The metric_name_overrides key '{key}'" f" should not be in omit_key_prefixes {omit_key_prefixes}" ) for key in (report_name_overrides or {}).keys(): if key in omit_key_prefixes: raise ValueError( f"The report_name_overrides key '{key}'" f" should not be in omit_key_prefixes {omit_key_prefixes}" ) if ok_key and ok_key in omit_key_prefixes: raise ValueError( f"The ok_key '{ok_key}'" f" should not be in omit_key_prefixes {omit_key_prefixes}" ) if needs_cleaning_key and needs_cleaning_key in omit_key_prefixes: raise ValueError( f"The needs_cleaning_key '{needs_cleaning_key}'" f" should not be in omit_key_prefixes {omit_key_prefixes}" ) if metric_name_overrides: for key in metric_name_overrides.keys(): if key not in subkeys: raise ValueError( f"metric_name_overrides key '{key}' not found in subdivision" f" keys {sorted(subkeys)}" ) if report_name_overrides: for key in report_name_overrides.keys(): if key not in subkeys: raise ValueError( f"report_name_overrides key '{key}' not found in subdivision" f" keys {sorted(subkeys)}" ) if ok_key and ok_key not in subkeys: raise ValueError( f"ok_key '{ok_key}' not found in subdivision keys {sorted(subkeys)}" ) if needs_cleaning_key and needs_cleaning_key not in subkeys: raise ValueError( f"needs_cleaning_key '{needs_cleaning_key}' not found in subdivision" f" keys {sorted(subkeys)}" ) self.model = model self.subdivisions = subdivisions or [] self.omit_key_prefixes = omit_key_prefixes or [] self.metric_name_overrides = metric_name_overrides or {} self.report_name_overrides = report_name_overrides or {} self.ok_key = ok_key self.needs_cleaning_key = needs_cleaning_key self.cleaned_report_name = cleaned_report_name def __repr__(self) -> str: args = [f"model={self.model.__name__}"] if self.subdivisions: args.append(f"subdivisions={self.subdivisions!r}") if self.omit_key_prefixes: args.append(f"omit_key_prefixes={self.omit_key_prefixes!r}") if self.metric_name_overrides: args.append(f"metric_name_overrides={self.metric_name_overrides!r}") if self.report_name_overrides: args.append(f"report_name_overrides={self.report_name_overrides!r}") if self.ok_key: args.append(f"ok_key={self.ok_key!r}") if self.needs_cleaning_key: args.append(f"needs_cleaning_key={self.needs_cleaning_key!r}") if self.cleaned_report_name != "Cleaned": args.append(f"cleaned_report_name={self.cleaned_report_name!r}") return f'{type(self).__name__}({", ".join(args)})' @property def model_key(self) -> str: return _metric_name_for_model(self.model) def omit_key(self, key: str) -> bool: return any( key == omit or key.startswith(omit + _KEY_SEP) for omit in self.omit_key_prefixes ) def metric_name(self, key: str) -> str | None: """Return None (to omit), a friendlier name, or the original name.""" if self.omit_key(key): return None return self.metric_name_overrides.get(key, key) def report_name(self, key: str) -> str | None: if self.omit_key(key): return None return self.report_name_overrides.get( key, key.split(_KEY_SEP)[-1].replace("_", " ").replace("!", "not ").title() ) def clean_group(self, subname: str) -> CLEAN_GROUP_T | None: """Identify when the subname is for a key cleaning stat.""" if subname == self.ok_key: return "ok" elif subname == self.needs_cleaning_key: return "needs_cleaning" else: return None def to_data_items(self) -> dict[str, BaseDataItem[M]]: """Converts the spec to a dictionary of DataItems.""" model_item = DataModelItem(self.model) data_items: dict[str, BaseDataItem[M]] = {"": model_item} for subdivision in self.subdivisions: data_items.update(subdivision.to_data_items(self, data_items).items()) # For the return dict, prefix with model's metric name model_key = model_item.metric_name return { (f"{model_key}{_KEY_SEP}{key}" if key else model_key): item for key, item in data_items.items() } class DataBisectSpec: """Bisect a parent query.""" def __init__( self, key: str, bisect_by: str | Q, ) -> None: if key.startswith(_KEY_SEP): raise ValueError(f"The key '{key}' should not start with '{_KEY_SEP}'") if bad_chars := [c for c in key if c not in _ITEM_KEY_CHAR_SET]: raise ValueError( f"key '{key}' has disallowed character" f"{'' if len(bad_chars) == 1 else 's'} '{''.join(sorted(bad_chars))}'" ) if not bisect_by: raise ValueError("Set the bisect_by filter") parts = key.split(".") for part in parts: if _NEGATE_PREFIX in part[1:]: raise ValueError( f"In key '{key}', the prefix '{_NEGATE_PREFIX}' is in the" f" middle of subkey '{part}'" ) if parts[-1][0] == _NEGATE_PREFIX: raise ValueError( f"In key '{key}', the prefix '{_NEGATE_PREFIX}' is not allowed" f" in the last subkey '{parts[-1]}'" ) self.key = key self.bisect_by = bisect_by def __repr__(self) -> str: args = [f"key={self.key!r}", f"bisect_by={self.bisect_by!r}"] return f'{type(self).__name__}({", ".join(args)})' def get_keys(self) -> list[str]: if _KEY_SEP in self.key: subparent_name, part_name = self.key.rsplit(_KEY_SEP, 1) neg_key = f"{subparent_name}{_KEY_SEP}{_NEGATE_PREFIX}{part_name}" else: subparent_name = "" part_name = self.key neg_key = f"{_NEGATE_PREFIX}{part_name}" return [self.key, neg_key] def to_data_items( self, model_spec: DataModelSpec[M], existing_items: dict[str, BaseDataItem[M]] ) -> dict[str, DataItem[M]]: """Return two data items bisecting the parent data.""" if _KEY_SEP in self.key: subparent_name, _ = self.key.rsplit(_KEY_SEP, 1) else: subparent_name = "" parent = existing_items[subparent_name] pos_key, neg_key = self.get_keys() return { pos_key: self._to_bisected_data_item( self.key, model_spec, parent, "positive" ), neg_key: self._to_bisected_data_item( neg_key, model_spec, parent, "negative" ), } def _to_bisected_data_item( self, key: str, model_spec: DataModelSpec[M], parent: BaseDataItem[M], bisect: Literal["positive", "negative"], ) -> DataItem[M]: """Create one of the bisected data items.""" return DataItem( parent=parent, filter_by=self.bisect_by, exclude=bisect == "negative", metric_name=model_spec.metric_name(key), clean_group=model_spec.clean_group(key), report_name=model_spec.report_name(key), ) class ReportEntry: """An entry in a report.""" def __init__( self, item: ReportItem, count: int, depth: int, child_keys: list[str], ) -> None: self.item = item self.count = count self.depth = depth self.child_keys = child_keys def __repr__(self) -> str: return ( f"{type(self).__name__}({self.item!r}, {self.count!r}, {self.depth!r}," f" {self.child_keys!r})" ) def __eq__(self, other: Any) -> bool: if isinstance(other, self.__class__): return ( self.__class__ == other.__class__ and self.item == other.item and self.count == other.count and self.depth == other.depth and self.child_keys == other.child_keys ) return NotImplemented class DataIssueTask: """Base class for data issue / cleaner tasks.""" slug: str # Short name, appropriate for command-line option title: str # Short title for reports check_description: str # A sentence describing what this cleaner is checking. can_clean: bool = False # True if the issue can be automatically cleaned data_specification: list[DataModelSpec[Any]] = [] # The specification for this task _counts: Counts | None _cleanup_data: CleanupData | None _cleaned: bool _cleaned_report_name: dict[str, str] def __init__(self) -> None: self._counts = None self._cleanup_data = None self._cleaned = False self._cleaned_report_name = {} self.data_items = self._get_data_items() def _get_data_items(self) -> dict[str, BaseDataItem[Any]]: """Turn the data_specification into a dictionary of names to DataItems.""" data_items: dict[str, BaseDataItem[Any]] = {} for model_spec in self.data_specification: if model_spec.model_key in data_items: raise ValueError( f"{model_spec!r}\nThe key '{model_spec.model_key}' already exists" ) data_items.update(self._get_data_items_for_model_spec(model_spec)) return data_items def _get_data_items_for_model_spec( self, model_spec: DataModelSpec[M] ) -> dict[str, BaseDataItem[M]]: data_items: dict[str, BaseDataItem[M]] = {} self._cleaned_report_name[model_spec.model_key] = model_spec.cleaned_report_name for name, item in model_spec.to_data_items().items(): if ( self.can_clean and isinstance(item, DataItem) and item.clean_group == "needs_cleaning" and not hasattr(self, f"clean_{model_spec.model_key}") ): raise ValueError( f"{model_spec!r}\n{item}\n" "This item has clean_group='needs_cleaning', but the" f" cleaning function clean_{model_spec.model_key} is not defined." ) data_items[name] = item return data_items @property def counts(self) -> Counts: """Get relevant counts for data issues and prepare to clean if possible.""" if self._counts is None: self._counts, self._cleanup_data = self._get_counts_and_data() return self._counts def _get_counts_and_data(self) -> tuple[Counts, CleanupData]: if not self.data_items: return {}, {} counts: Counts = {"summary": {"ok": 0, "needs_cleaning": 0}} cleanup_data: CleanupData = {} for name, data_item in self.data_items.items(): if not data_item.metric_name: continue count = data_item.count() if isinstance(data_item, DataModelItem): counts[name] = {"all": count} else: model_name = name.split(".")[0] counts[model_name][data_item.metric_name] = count if isinstance(data_item, DataItem) and data_item.clean_group: counts["summary"][data_item.clean_group] += count if data_item.clean_group == "needs_cleaning": cleanup_data[model_name] = name return counts, cleanup_data @property def cleanup_data(self) -> CleanupData: """Get data needed to clean data issues.""" if not self.counts: return {} return self._cleanup_data or {} def issues(self) -> int: """Return the number of detected data issues.""" return self.counts["summary"]["needs_cleaning"] def get_report_entries(self) -> dict[str, ReportEntry]: """ Return an ordered dict of ReportEntries and related data. The key of the returned dict is a dotted path representing the path to the top of the hierarchy. If the ReportEntry is a DataModelItem or DataItem, it is the same key as used in `data_items`. The value of the returned dict is a 4-element tuple: - The ReportEntry, which may be a DataModelItem or DataItem - The count at this level - The key of the parent to this entry, or '' if a top element - A list of keys of the child elements of the entry, in sorted order Any DataModelItem or DataItem with a blank `metric_name` is omitted. Additional ReportEntry items may be added, for example to represent the cleaned data. """ # Pass 1: Gather report items and sorting data. # The sorting data: # clean_group_keys - what DataItems have .clean_group set # count_by_key - the pre-computed count from .counts # model_keys - the order models appeared in .data_specification # report_items - the items that have a .metric_name clean_group_keys: dict[CLEAN_GROUP_T, set[str]] = { "ok": set(), "needs_cleaning": set(), } count_by_key: dict[str, int] = {} model_keys: list[str] = [] report_items: dict[str, ReportItem] = {} for key, data_item in self.data_items.items(): if not (metric_name := data_item.metric_name): continue report_items[key] = data_item if _KEY_SEP in key: # Handle DataItem if not isinstance(data_item, DataItem): # pragma: no cover raise Exception( f"For key '{key}', expected a DataItem, got {data_item!r}" ) model_key, _ = key.split(_KEY_SEP, 1) count_by_key[key] = self.counts[model_key][metric_name] if data_item.clean_group: clean_group_keys[data_item.clean_group].add(key) else: # Handle DataModelItem if not isinstance(data_item, DataModelItem): # pragma: no cover raise Exception( f"For key '{key}', expected a DataModelItem, got {data_item!r}" ) model_keys.append(key) count_by_key[key] = self.counts[key]["all"] # Pass 1.2: Created CleanedItem records for key in clean_group_keys["needs_cleaning"]: model_key, _ = key.split(_KEY_SEP, 1) try: clean_count = self.counts[model_key][_CLEANED_METRIC_NAME] except KeyError: continue clean_item = CleanedItem( clean_count, report_name=self._cleaned_report_name[model_key] ) clean_key = f"{key}{_KEY_SEP}cleaned" count_by_key[clean_key] = clean_count report_items[clean_key] = clean_item # Pass 2: Create index parts for sorting # This determines the sort order between siblings _INDEX_PART = tuple[int, int, str] sort_index_part: dict[str, _INDEX_PART] = {} for key, item in report_items.items(): # First, sort by if this or a descendant has a cleaning_order # None, then ok, then needs_cleaning, then both has_ok_descendant = any( ok_key.startswith(key) for ok_key in clean_group_keys["ok"] ) has_nc_descendant = any( nc_key.startswith(key) for nc_key in clean_group_keys["needs_cleaning"] ) cleaning_order = (1 if has_ok_descendant else 0) + ( 2 if has_nc_descendant else 0 ) # Next, sort by negation # No negation then negation if _KEY_SEP in key: key_part = key.rsplit(_KEY_SEP, 1)[1] else: key_part = key if key_part.startswith(_NEGATE_PREFIX): neg_order = 1 key_part = key_part[1:] else: neg_order = 0 # Finally, sort by name sort_index_part[key] = (cleaning_order, neg_order, key_part) # Pass 3: Create the sort index # Short paths come before long paths # Next, use the index part to sort _INDEX_FULL = tuple[int, int, tuple[_INDEX_PART, ...]] sort_index: dict[str, _INDEX_FULL] = {} for key in report_items.keys(): key_parts = key.split(_KEY_SEP) index_parts: list[_INDEX_PART] = [] model_index = -1 while key_parts: if len(key_parts) == 1: model_index = model_keys.index(key_parts[0]) subkey = _KEY_SEP.join(key_parts) index_parts.insert(0, sort_index_part[subkey]) key_parts.pop() sort_index[key] = (model_index, len(index_parts), tuple(index_parts)) # Pass 4: Return the sort dict of ReportEntries def get_sort_index(key: str) -> _INDEX_FULL: return sort_index[key] reports: dict[str, ReportEntry] = {} for key in sorted(report_items, key=get_sort_index): item = report_items[key] reports[key] = ReportEntry( item, count_by_key[key], key.count(_KEY_SEP) + 1, [] ) if _KEY_SEP in key: parent_key = key.rsplit(_KEY_SEP, 1)[0] reports[parent_key].child_keys.append(key) return reports def markdown_report(self) -> str: """Return Markdown-formatted report of issues found and (maybe) fixed.""" lines: list[str] = [] report_entries = self.get_report_entries() # Get the maximum length of children for each entry with children max_len_children: dict[str, int] = {} for key, entry in report_entries.items(): if entry.child_keys: children = [report_entries[subkey] for subkey in entry.child_keys] max_len_children[key] = max( len(child.item.report_name or "") for child in children ) # Output the markdown lines for key, entry in report_entries.items(): if isinstance(entry.item, DataModelItem): lines.append(f"{entry.item.report_name}:") lines.append(f" All: {entry.count}") else: parent_key, _ = key.rsplit(_KEY_SEP, 1) parent_entry = report_entries[parent_key] if parent_entry.count <= 0: continue indent = " " * entry.depth max_len = max_len_children[parent_key] lines.append( f"{indent}{entry.item.report_name:<{max_len}}:" f" {self._as_percent(entry.count, parent_entry.count)}" ) return "\n".join(lines) @staticmethod def _as_percent(part: int, whole: int) -> str: """Return value followed by percent of whole, like '5 ( 30.0%)'""" if whole <= 0: raise ValueError(f"whole ({whole}) can not be less than 0") if part < 0: raise ValueError(f"part ({part}) can not be negative") len_whole = len(str(whole)) return f"{part:{len_whole}d} ({part / whole:6.1%})" class CleanerTask(DataIssueTask): """Base class for tasks that can clean up detected issues.""" can_clean = True def clean(self) -> int: """Clean the detected items, and update counts["summary"]""" summary = self.counts["summary"] if not self._cleaned: summary[_CLEANED_METRIC_NAME] = self._clean() self._cleaned = True return summary[_CLEANED_METRIC_NAME] def _clean(self) -> int: """Call the specified cleaners.""" counts = self.counts cleanup_data = self.cleanup_data total_cleaned = 0 for model_name, metric_name in cleanup_data.items(): clean_item = self.data_items[metric_name] cleaner = getattr(self, f"clean_{model_name}") count = cleaner(clean_item.get_queryset()) counts[model_name][_CLEANED_METRIC_NAME] = count total_cleaned += count return total_cleaned