privaterelay/cleaner_task.py (611 lines of code) (raw):
"""Framework for tasks that identify data issues and (if possible) clean them up"""
from __future__ import annotations
import string
from abc import ABCMeta, abstractmethod
from typing import Any, Generic, Literal, TypeVar, get_args
from django.db.models import Model, Q
from django.db.models.query import QuerySet
Counts = dict[str, dict[str, int]]
CleanupData = dict[str, Any]
M = TypeVar("M", bound=Model)
CLEAN_GROUP_T = Literal["ok", "needs_cleaning"]
# Define allowed characters for item keys
# {model_plural}.[!]{sub_name1}.[!]{sub_name2}
_KEY_SEP = "."
_NEGATE_PREFIX = "!"
_ITEM_KEY_CHAR_SET = set(
string.ascii_lowercase + string.digits + "_" + _NEGATE_PREFIX + _KEY_SEP
)
# Used by CleanedItem
_CLEANED_METRIC_NAME = "cleaned"
def _metric_name_for_model(model: type[M]) -> str:
"""The model's metric key, used in metrics and as a dictionary key."""
return str(model._meta.verbose_name_plural).replace(" ", "_")
class ReportItem(metaclass=ABCMeta):
"""
An item in a data task report.
This is the base model in the reporting item hierarchy. Code should use the derived
classes CleanedItem, DataModelItem, and DataItem.
"""
def __init__(self, metric_name: str | None = None, report_name: str | None = None):
"""
Initialize a ReportItem.
The `metric_name` parameter sets the name of the entry when it appears as a
`dict` or JSON key. The default is `None`, which omits the entry from
reports.
The `report_name` parameter sets the name of the entry when it appears in a
report for humans. It can be omitted when `metric_name` is None.
"""
if metric_name and (
bad_chars := [c for c in metric_name if c not in _ITEM_KEY_CHAR_SET]
):
raise ValueError(
f"metric_name '{metric_name}' has disallowed character"
f"{'' if len(bad_chars) == 1 else 's'} '{''.join(sorted(bad_chars))}'"
)
if metric_name == "":
raise ValueError("metric_name is an empty string, should be None")
if metric_name is None and report_name is not None:
raise ValueError(f"report_name is '{report_name}', but metric_name is None")
if report_name == "":
raise ValueError("report_name is an empty string, should be None")
self.metric_name = metric_name
self.report_name = report_name
@abstractmethod
def count(self) -> int:
raise NotImplementedError
class CleanedItem(ReportItem):
"""Represents the results of cleaning a Model."""
def __init__(self, count: int, report_name: str = "Cleaned") -> None:
if count < 0:
raise ValueError("count can not be negative")
self._count = count
super().__init__(metric_name=_CLEANED_METRIC_NAME, report_name=report_name)
def __repr__(self) -> str:
args = [str(self._count)]
if self.report_name != "Cleaned":
args.append(f"report_name={self.report_name!r}")
return f'{type(self).__name__}({", ".join(args)})'
def __eq__(self, other: Any) -> bool:
if isinstance(other, CleanedItem):
return (
self.__class__ == other.__class__
and self._count == other._count
and self.metric_name == other.metric_name
and self.report_name == other.report_name
)
return NotImplemented
def count(self) -> int:
return self._count
class BaseDataItem(ReportItem, Generic[M]):
"""An entry in a data task report backed by a database query."""
def __init__(
self,
model_or_parent: type[M] | BaseDataItem[M],
filter_by: str | Q | None = None,
exclude: bool = False,
metric_name: str | None = None,
report_name: str | None = None,
) -> None:
if metric_name == _CLEANED_METRIC_NAME:
raise ValueError(f"metric_name '{metric_name}' is reserved for CleanedItem")
self._model_or_parent = model_or_parent
self.filter_by = filter_by
self.exclude = exclude
super().__init__(metric_name=metric_name, report_name=report_name)
def __eq__(self, other: Any) -> bool:
if isinstance(other, BaseDataItem):
return (
self._model_or_parent == other._model_or_parent
and self.filter_by == other.filter_by
and self.exclude == other.exclude
and self.metric_name == other.metric_name
and self.report_name == other.report_name
)
return NotImplemented
def get_queryset(self) -> QuerySet[M]:
"""Return the Django query for this BaseDataItem."""
if isinstance(self._model_or_parent, BaseDataItem):
query = self._model_or_parent.get_queryset()
else:
query = self._model_or_parent._default_manager.all()
if isinstance(self.filter_by, str):
filter_by = {self.filter_by: True}
if self.exclude:
query = query.exclude(**filter_by)
else:
query = query.filter(**filter_by)
elif isinstance(self.filter_by, Q):
if self.exclude:
query = query.exclude(self.filter_by)
else:
query = query.filter(self.filter_by)
return query
def count(self) -> int:
"""Return the number of rows matched for this BaseDataItem."""
return self.get_queryset().count()
class DataModelItem(BaseDataItem[M]):
"""A BaseDataItem representing the top-level Model"""
_model_or_parent: type[M]
metric_name: str
report_name: str
filter_by: None
def __init__(self, model: type[M]) -> None:
"""Initialize a DataModelItem."""
super().__init__(
model_or_parent=model,
metric_name=_metric_name_for_model(model),
report_name=str(model._meta.verbose_name_plural).title(),
)
def __repr__(self) -> str:
return f"{type(self).__name__}({self._model_or_parent.__name__})"
class DataItem(BaseDataItem[M]):
"""
A DataItem is a subquery of a DataModelItem or DataItem.
A top-level model DataItem represents all the items in a table. A query that selects
some rows is a DataItem with a parent. The specific rows of interest can be
represented by multiple levels of DataItems, giving context to the specific rows.
"""
_model_or_parent: BaseDataItem[M]
def __init__(
self,
parent: BaseDataItem[M],
filter_by: str | Q,
exclude: bool = False,
metric_name: str | None = None,
report_name: str | None = None,
clean_group: CLEAN_GROUP_T | None = None,
) -> None:
"""
Initialize a DataItem, checking for init-time issues.
The `filter_by` parameter sets the filter (`filter` is a Python keyword). It can
be a string, which represents a boolean filter. It can be a Django Q object,
such as `Q(num_deleted_relay_addresses__gt=5)`.
The default is to include rows matching the query. If `exclude` is set to
`True`, then the query is for rows that do not match the filter.
The `clean_group` parameter identifies the DataItem as a query of interest,
usually in the context of a CleanerTask. An 'ok' value means the query
represents rows without a problem, and a 'needs_cleaning' value means the rows
need fixing.
"""
if filter_by == "":
raise ValueError("filter_by is an empty string, should be set")
if metric_name is None and clean_group is not None:
raise ValueError(f"clean_group is '{clean_group}', but metric_name is None")
if clean_group is not None and clean_group not in get_args(CLEAN_GROUP_T):
raise ValueError(f"clean_group has invalid value '{clean_group}'")
self.clean_group = clean_group
super().__init__(
model_or_parent=parent,
filter_by=filter_by,
exclude=exclude,
metric_name=metric_name,
report_name=report_name,
)
def __repr__(self) -> str:
if isinstance(self._model_or_parent, DataItem):
args = [
f"<{type(self._model_or_parent).__name__}"
f"(metric_name={self._model_or_parent.metric_name!r}, ...)>"
]
else:
args = [repr(self._model_or_parent)]
args.append(repr(self.filter_by))
if self.exclude:
args.append(f"exclude={self.exclude!r}")
if self.metric_name:
args.append(f"metric_name={self.metric_name!r}")
if self.report_name:
args.append(f"report_name={self.report_name!r}")
if self.clean_group:
args.append(f"clean_group={self.clean_group!r}")
return f'{type(self).__name__}({", ".join(args)})'
class DataModelSpec(Generic[M]):
"""
Define queries on a Model that can identify issues.
This provides a higher-level interface for constructing a hierarchy of DataItems.
The top-level is the most general, and queries are filtered to focus on more
specific sets of rows as you go down the hierarchy.
Each DataItem is identified by a key which represents its place in the hierarchy.
For example, the key for a DataItem for active users created in 2023 might be
'active.created_in_2023'.
When exporting the {key: DataItem} dict with `to_data_items`, the keys are turned
into fully-qualified key by prefixing them with the model key. The model key is
derived from the `verbose_plural_name` of the model. For example, a fully-qualified
key would be 'users.active.created_in_2023'.
https://docs.djangoproject.com/en/4.2/ref/models/options/#verbose-name-plural
"""
def __init__(
self,
model: type[M],
subdivisions: list[DataBisectSpec] | None = None,
omit_key_prefixes: list[str] | None = None,
metric_name_overrides: dict[str, str] | None = None,
report_name_overrides: dict[str, str] | None = None,
ok_key: str | None = None,
needs_cleaning_key: str | None = None,
cleaned_report_name: str = "Cleaned",
) -> None:
"""
Initialize a DataModelSpec, checking for init-time issues.
Keyword arguments:
model - The model for this hierarchy of queries
subdivisions - The subqueries, as a list of DataBisectSpecs
omit_key_prefixes - A list of key prefixes that should be omitted from reports.
Since the key name represents the hierarchy, omitting a prefix removes a
whole branch of queries and subqueries.
metric_name_overrides - A dict of keys to the metric names as used in `dict` and
JSON keys. The default metric name is the last component of the key.
report_name_overrides - A dict of keys to the human-suitable report names. The
default report name is the last component of the key, converted to a
title-cased phrase.
ok_key - The key for the "ok" DataItem, such as rows that have already been
cleaned.
needs_cleaning_key - The key for the "needs_cleaning" DataItem, such as rows
that need manual or automated cleaning.
cleaned_report_name - The report name for the cleaned DataItem, added after
running cleaning. This defaults to "Cleaned".
"""
if not isinstance(model, type) or not issubclass(model, Model):
raise ValueError(f"model {model!r} is not a Django model.")
subkeys: set[str] = set()
for sub in subdivisions or []:
for key in sub.get_keys():
if key in subkeys:
raise ValueError("Duplicate key 'active' in subdivisions")
subkeys.add(key)
if omit_key_prefixes:
if "" in omit_key_prefixes:
raise ValueError(
"omit_key_prefixes should not include the empty string"
)
for key in omit_key_prefixes:
if key not in subkeys:
raise ValueError(
f"omit_key_prefixes key '{key}' not found in subdivision"
f" keys {sorted(subkeys)}"
)
for key in (metric_name_overrides or {}).keys():
if key in omit_key_prefixes:
raise ValueError(
f"The metric_name_overrides key '{key}'"
f" should not be in omit_key_prefixes {omit_key_prefixes}"
)
for key in (report_name_overrides or {}).keys():
if key in omit_key_prefixes:
raise ValueError(
f"The report_name_overrides key '{key}'"
f" should not be in omit_key_prefixes {omit_key_prefixes}"
)
if ok_key and ok_key in omit_key_prefixes:
raise ValueError(
f"The ok_key '{ok_key}'"
f" should not be in omit_key_prefixes {omit_key_prefixes}"
)
if needs_cleaning_key and needs_cleaning_key in omit_key_prefixes:
raise ValueError(
f"The needs_cleaning_key '{needs_cleaning_key}'"
f" should not be in omit_key_prefixes {omit_key_prefixes}"
)
if metric_name_overrides:
for key in metric_name_overrides.keys():
if key not in subkeys:
raise ValueError(
f"metric_name_overrides key '{key}' not found in subdivision"
f" keys {sorted(subkeys)}"
)
if report_name_overrides:
for key in report_name_overrides.keys():
if key not in subkeys:
raise ValueError(
f"report_name_overrides key '{key}' not found in subdivision"
f" keys {sorted(subkeys)}"
)
if ok_key and ok_key not in subkeys:
raise ValueError(
f"ok_key '{ok_key}' not found in subdivision keys {sorted(subkeys)}"
)
if needs_cleaning_key and needs_cleaning_key not in subkeys:
raise ValueError(
f"needs_cleaning_key '{needs_cleaning_key}' not found in subdivision"
f" keys {sorted(subkeys)}"
)
self.model = model
self.subdivisions = subdivisions or []
self.omit_key_prefixes = omit_key_prefixes or []
self.metric_name_overrides = metric_name_overrides or {}
self.report_name_overrides = report_name_overrides or {}
self.ok_key = ok_key
self.needs_cleaning_key = needs_cleaning_key
self.cleaned_report_name = cleaned_report_name
def __repr__(self) -> str:
args = [f"model={self.model.__name__}"]
if self.subdivisions:
args.append(f"subdivisions={self.subdivisions!r}")
if self.omit_key_prefixes:
args.append(f"omit_key_prefixes={self.omit_key_prefixes!r}")
if self.metric_name_overrides:
args.append(f"metric_name_overrides={self.metric_name_overrides!r}")
if self.report_name_overrides:
args.append(f"report_name_overrides={self.report_name_overrides!r}")
if self.ok_key:
args.append(f"ok_key={self.ok_key!r}")
if self.needs_cleaning_key:
args.append(f"needs_cleaning_key={self.needs_cleaning_key!r}")
if self.cleaned_report_name != "Cleaned":
args.append(f"cleaned_report_name={self.cleaned_report_name!r}")
return f'{type(self).__name__}({", ".join(args)})'
@property
def model_key(self) -> str:
return _metric_name_for_model(self.model)
def omit_key(self, key: str) -> bool:
return any(
key == omit or key.startswith(omit + _KEY_SEP)
for omit in self.omit_key_prefixes
)
def metric_name(self, key: str) -> str | None:
"""Return None (to omit), a friendlier name, or the original name."""
if self.omit_key(key):
return None
return self.metric_name_overrides.get(key, key)
def report_name(self, key: str) -> str | None:
if self.omit_key(key):
return None
return self.report_name_overrides.get(
key, key.split(_KEY_SEP)[-1].replace("_", " ").replace("!", "not ").title()
)
def clean_group(self, subname: str) -> CLEAN_GROUP_T | None:
"""Identify when the subname is for a key cleaning stat."""
if subname == self.ok_key:
return "ok"
elif subname == self.needs_cleaning_key:
return "needs_cleaning"
else:
return None
def to_data_items(self) -> dict[str, BaseDataItem[M]]:
"""Converts the spec to a dictionary of DataItems."""
model_item = DataModelItem(self.model)
data_items: dict[str, BaseDataItem[M]] = {"": model_item}
for subdivision in self.subdivisions:
data_items.update(subdivision.to_data_items(self, data_items).items())
# For the return dict, prefix with model's metric name
model_key = model_item.metric_name
return {
(f"{model_key}{_KEY_SEP}{key}" if key else model_key): item
for key, item in data_items.items()
}
class DataBisectSpec:
"""Bisect a parent query."""
def __init__(
self,
key: str,
bisect_by: str | Q,
) -> None:
if key.startswith(_KEY_SEP):
raise ValueError(f"The key '{key}' should not start with '{_KEY_SEP}'")
if bad_chars := [c for c in key if c not in _ITEM_KEY_CHAR_SET]:
raise ValueError(
f"key '{key}' has disallowed character"
f"{'' if len(bad_chars) == 1 else 's'} '{''.join(sorted(bad_chars))}'"
)
if not bisect_by:
raise ValueError("Set the bisect_by filter")
parts = key.split(".")
for part in parts:
if _NEGATE_PREFIX in part[1:]:
raise ValueError(
f"In key '{key}', the prefix '{_NEGATE_PREFIX}' is in the"
f" middle of subkey '{part}'"
)
if parts[-1][0] == _NEGATE_PREFIX:
raise ValueError(
f"In key '{key}', the prefix '{_NEGATE_PREFIX}' is not allowed"
f" in the last subkey '{parts[-1]}'"
)
self.key = key
self.bisect_by = bisect_by
def __repr__(self) -> str:
args = [f"key={self.key!r}", f"bisect_by={self.bisect_by!r}"]
return f'{type(self).__name__}({", ".join(args)})'
def get_keys(self) -> list[str]:
if _KEY_SEP in self.key:
subparent_name, part_name = self.key.rsplit(_KEY_SEP, 1)
neg_key = f"{subparent_name}{_KEY_SEP}{_NEGATE_PREFIX}{part_name}"
else:
subparent_name = ""
part_name = self.key
neg_key = f"{_NEGATE_PREFIX}{part_name}"
return [self.key, neg_key]
def to_data_items(
self, model_spec: DataModelSpec[M], existing_items: dict[str, BaseDataItem[M]]
) -> dict[str, DataItem[M]]:
"""Return two data items bisecting the parent data."""
if _KEY_SEP in self.key:
subparent_name, _ = self.key.rsplit(_KEY_SEP, 1)
else:
subparent_name = ""
parent = existing_items[subparent_name]
pos_key, neg_key = self.get_keys()
return {
pos_key: self._to_bisected_data_item(
self.key, model_spec, parent, "positive"
),
neg_key: self._to_bisected_data_item(
neg_key, model_spec, parent, "negative"
),
}
def _to_bisected_data_item(
self,
key: str,
model_spec: DataModelSpec[M],
parent: BaseDataItem[M],
bisect: Literal["positive", "negative"],
) -> DataItem[M]:
"""Create one of the bisected data items."""
return DataItem(
parent=parent,
filter_by=self.bisect_by,
exclude=bisect == "negative",
metric_name=model_spec.metric_name(key),
clean_group=model_spec.clean_group(key),
report_name=model_spec.report_name(key),
)
class ReportEntry:
"""An entry in a report."""
def __init__(
self,
item: ReportItem,
count: int,
depth: int,
child_keys: list[str],
) -> None:
self.item = item
self.count = count
self.depth = depth
self.child_keys = child_keys
def __repr__(self) -> str:
return (
f"{type(self).__name__}({self.item!r}, {self.count!r}, {self.depth!r},"
f" {self.child_keys!r})"
)
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (
self.__class__ == other.__class__
and self.item == other.item
and self.count == other.count
and self.depth == other.depth
and self.child_keys == other.child_keys
)
return NotImplemented
class DataIssueTask:
"""Base class for data issue / cleaner tasks."""
slug: str # Short name, appropriate for command-line option
title: str # Short title for reports
check_description: str # A sentence describing what this cleaner is checking.
can_clean: bool = False # True if the issue can be automatically cleaned
data_specification: list[DataModelSpec[Any]] = [] # The specification for this task
_counts: Counts | None
_cleanup_data: CleanupData | None
_cleaned: bool
_cleaned_report_name: dict[str, str]
def __init__(self) -> None:
self._counts = None
self._cleanup_data = None
self._cleaned = False
self._cleaned_report_name = {}
self.data_items = self._get_data_items()
def _get_data_items(self) -> dict[str, BaseDataItem[Any]]:
"""Turn the data_specification into a dictionary of names to DataItems."""
data_items: dict[str, BaseDataItem[Any]] = {}
for model_spec in self.data_specification:
if model_spec.model_key in data_items:
raise ValueError(
f"{model_spec!r}\nThe key '{model_spec.model_key}' already exists"
)
data_items.update(self._get_data_items_for_model_spec(model_spec))
return data_items
def _get_data_items_for_model_spec(
self, model_spec: DataModelSpec[M]
) -> dict[str, BaseDataItem[M]]:
data_items: dict[str, BaseDataItem[M]] = {}
self._cleaned_report_name[model_spec.model_key] = model_spec.cleaned_report_name
for name, item in model_spec.to_data_items().items():
if (
self.can_clean
and isinstance(item, DataItem)
and item.clean_group == "needs_cleaning"
and not hasattr(self, f"clean_{model_spec.model_key}")
):
raise ValueError(
f"{model_spec!r}\n{item}\n"
"This item has clean_group='needs_cleaning', but the"
f" cleaning function clean_{model_spec.model_key} is not defined."
)
data_items[name] = item
return data_items
@property
def counts(self) -> Counts:
"""Get relevant counts for data issues and prepare to clean if possible."""
if self._counts is None:
self._counts, self._cleanup_data = self._get_counts_and_data()
return self._counts
def _get_counts_and_data(self) -> tuple[Counts, CleanupData]:
if not self.data_items:
return {}, {}
counts: Counts = {"summary": {"ok": 0, "needs_cleaning": 0}}
cleanup_data: CleanupData = {}
for name, data_item in self.data_items.items():
if not data_item.metric_name:
continue
count = data_item.count()
if isinstance(data_item, DataModelItem):
counts[name] = {"all": count}
else:
model_name = name.split(".")[0]
counts[model_name][data_item.metric_name] = count
if isinstance(data_item, DataItem) and data_item.clean_group:
counts["summary"][data_item.clean_group] += count
if data_item.clean_group == "needs_cleaning":
cleanup_data[model_name] = name
return counts, cleanup_data
@property
def cleanup_data(self) -> CleanupData:
"""Get data needed to clean data issues."""
if not self.counts:
return {}
return self._cleanup_data or {}
def issues(self) -> int:
"""Return the number of detected data issues."""
return self.counts["summary"]["needs_cleaning"]
def get_report_entries(self) -> dict[str, ReportEntry]:
"""
Return an ordered dict of ReportEntries and related data.
The key of the returned dict is a dotted path representing the path to the top
of the hierarchy. If the ReportEntry is a DataModelItem or DataItem, it is the
same key as used in `data_items`.
The value of the returned dict is a 4-element tuple:
- The ReportEntry, which may be a DataModelItem or DataItem
- The count at this level
- The key of the parent to this entry, or '' if a top element
- A list of keys of the child elements of the entry, in sorted order
Any DataModelItem or DataItem with a blank `metric_name` is omitted. Additional
ReportEntry items may be added, for example to represent the cleaned data.
"""
# Pass 1: Gather report items and sorting data.
# The sorting data:
# clean_group_keys - what DataItems have .clean_group set
# count_by_key - the pre-computed count from .counts
# model_keys - the order models appeared in .data_specification
# report_items - the items that have a .metric_name
clean_group_keys: dict[CLEAN_GROUP_T, set[str]] = {
"ok": set(),
"needs_cleaning": set(),
}
count_by_key: dict[str, int] = {}
model_keys: list[str] = []
report_items: dict[str, ReportItem] = {}
for key, data_item in self.data_items.items():
if not (metric_name := data_item.metric_name):
continue
report_items[key] = data_item
if _KEY_SEP in key:
# Handle DataItem
if not isinstance(data_item, DataItem): # pragma: no cover
raise Exception(
f"For key '{key}', expected a DataItem, got {data_item!r}"
)
model_key, _ = key.split(_KEY_SEP, 1)
count_by_key[key] = self.counts[model_key][metric_name]
if data_item.clean_group:
clean_group_keys[data_item.clean_group].add(key)
else:
# Handle DataModelItem
if not isinstance(data_item, DataModelItem): # pragma: no cover
raise Exception(
f"For key '{key}', expected a DataModelItem, got {data_item!r}"
)
model_keys.append(key)
count_by_key[key] = self.counts[key]["all"]
# Pass 1.2: Created CleanedItem records
for key in clean_group_keys["needs_cleaning"]:
model_key, _ = key.split(_KEY_SEP, 1)
try:
clean_count = self.counts[model_key][_CLEANED_METRIC_NAME]
except KeyError:
continue
clean_item = CleanedItem(
clean_count, report_name=self._cleaned_report_name[model_key]
)
clean_key = f"{key}{_KEY_SEP}cleaned"
count_by_key[clean_key] = clean_count
report_items[clean_key] = clean_item
# Pass 2: Create index parts for sorting
# This determines the sort order between siblings
_INDEX_PART = tuple[int, int, str]
sort_index_part: dict[str, _INDEX_PART] = {}
for key, item in report_items.items():
# First, sort by if this or a descendant has a cleaning_order
# None, then ok, then needs_cleaning, then both
has_ok_descendant = any(
ok_key.startswith(key) for ok_key in clean_group_keys["ok"]
)
has_nc_descendant = any(
nc_key.startswith(key) for nc_key in clean_group_keys["needs_cleaning"]
)
cleaning_order = (1 if has_ok_descendant else 0) + (
2 if has_nc_descendant else 0
)
# Next, sort by negation
# No negation then negation
if _KEY_SEP in key:
key_part = key.rsplit(_KEY_SEP, 1)[1]
else:
key_part = key
if key_part.startswith(_NEGATE_PREFIX):
neg_order = 1
key_part = key_part[1:]
else:
neg_order = 0
# Finally, sort by name
sort_index_part[key] = (cleaning_order, neg_order, key_part)
# Pass 3: Create the sort index
# Short paths come before long paths
# Next, use the index part to sort
_INDEX_FULL = tuple[int, int, tuple[_INDEX_PART, ...]]
sort_index: dict[str, _INDEX_FULL] = {}
for key in report_items.keys():
key_parts = key.split(_KEY_SEP)
index_parts: list[_INDEX_PART] = []
model_index = -1
while key_parts:
if len(key_parts) == 1:
model_index = model_keys.index(key_parts[0])
subkey = _KEY_SEP.join(key_parts)
index_parts.insert(0, sort_index_part[subkey])
key_parts.pop()
sort_index[key] = (model_index, len(index_parts), tuple(index_parts))
# Pass 4: Return the sort dict of ReportEntries
def get_sort_index(key: str) -> _INDEX_FULL:
return sort_index[key]
reports: dict[str, ReportEntry] = {}
for key in sorted(report_items, key=get_sort_index):
item = report_items[key]
reports[key] = ReportEntry(
item, count_by_key[key], key.count(_KEY_SEP) + 1, []
)
if _KEY_SEP in key:
parent_key = key.rsplit(_KEY_SEP, 1)[0]
reports[parent_key].child_keys.append(key)
return reports
def markdown_report(self) -> str:
"""Return Markdown-formatted report of issues found and (maybe) fixed."""
lines: list[str] = []
report_entries = self.get_report_entries()
# Get the maximum length of children for each entry with children
max_len_children: dict[str, int] = {}
for key, entry in report_entries.items():
if entry.child_keys:
children = [report_entries[subkey] for subkey in entry.child_keys]
max_len_children[key] = max(
len(child.item.report_name or "") for child in children
)
# Output the markdown lines
for key, entry in report_entries.items():
if isinstance(entry.item, DataModelItem):
lines.append(f"{entry.item.report_name}:")
lines.append(f" All: {entry.count}")
else:
parent_key, _ = key.rsplit(_KEY_SEP, 1)
parent_entry = report_entries[parent_key]
if parent_entry.count <= 0:
continue
indent = " " * entry.depth
max_len = max_len_children[parent_key]
lines.append(
f"{indent}{entry.item.report_name:<{max_len}}:"
f" {self._as_percent(entry.count, parent_entry.count)}"
)
return "\n".join(lines)
@staticmethod
def _as_percent(part: int, whole: int) -> str:
"""Return value followed by percent of whole, like '5 ( 30.0%)'"""
if whole <= 0:
raise ValueError(f"whole ({whole}) can not be less than 0")
if part < 0:
raise ValueError(f"part ({part}) can not be negative")
len_whole = len(str(whole))
return f"{part:{len_whole}d} ({part / whole:6.1%})"
class CleanerTask(DataIssueTask):
"""Base class for tasks that can clean up detected issues."""
can_clean = True
def clean(self) -> int:
"""Clean the detected items, and update counts["summary"]"""
summary = self.counts["summary"]
if not self._cleaned:
summary[_CLEANED_METRIC_NAME] = self._clean()
self._cleaned = True
return summary[_CLEANED_METRIC_NAME]
def _clean(self) -> int:
"""Call the specified cleaners."""
counts = self.counts
cleanup_data = self.cleanup_data
total_cleaned = 0
for model_name, metric_name in cleanup_data.items():
clean_item = self.data_items[metric_name]
cleaner = getattr(self, f"clean_{model_name}")
count = cleaner(clean_item.get_queryset())
counts[model_name][_CLEANED_METRIC_NAME] = count
total_cleaned += count
return total_cleaned