in privaterelay/cleaner_task.py [0:0]
def get_report_entries(self) -> dict[str, ReportEntry]:
"""
Return an ordered dict of ReportEntries and related data.
The key of the returned dict is a dotted path representing the path to the top
of the hierarchy. If the ReportEntry is a DataModelItem or DataItem, it is the
same key as used in `data_items`.
The value of the returned dict is a 4-element tuple:
- The ReportEntry, which may be a DataModelItem or DataItem
- The count at this level
- The key of the parent to this entry, or '' if a top element
- A list of keys of the child elements of the entry, in sorted order
Any DataModelItem or DataItem with a blank `metric_name` is omitted. Additional
ReportEntry items may be added, for example to represent the cleaned data.
"""
# Pass 1: Gather report items and sorting data.
# The sorting data:
# clean_group_keys - what DataItems have .clean_group set
# count_by_key - the pre-computed count from .counts
# model_keys - the order models appeared in .data_specification
# report_items - the items that have a .metric_name
clean_group_keys: dict[CLEAN_GROUP_T, set[str]] = {
"ok": set(),
"needs_cleaning": set(),
}
count_by_key: dict[str, int] = {}
model_keys: list[str] = []
report_items: dict[str, ReportItem] = {}
for key, data_item in self.data_items.items():
if not (metric_name := data_item.metric_name):
continue
report_items[key] = data_item
if _KEY_SEP in key:
# Handle DataItem
if not isinstance(data_item, DataItem): # pragma: no cover
raise Exception(
f"For key '{key}', expected a DataItem, got {data_item!r}"
)
model_key, _ = key.split(_KEY_SEP, 1)
count_by_key[key] = self.counts[model_key][metric_name]
if data_item.clean_group:
clean_group_keys[data_item.clean_group].add(key)
else:
# Handle DataModelItem
if not isinstance(data_item, DataModelItem): # pragma: no cover
raise Exception(
f"For key '{key}', expected a DataModelItem, got {data_item!r}"
)
model_keys.append(key)
count_by_key[key] = self.counts[key]["all"]
# Pass 1.2: Created CleanedItem records
for key in clean_group_keys["needs_cleaning"]:
model_key, _ = key.split(_KEY_SEP, 1)
try:
clean_count = self.counts[model_key][_CLEANED_METRIC_NAME]
except KeyError:
continue
clean_item = CleanedItem(
clean_count, report_name=self._cleaned_report_name[model_key]
)
clean_key = f"{key}{_KEY_SEP}cleaned"
count_by_key[clean_key] = clean_count
report_items[clean_key] = clean_item
# Pass 2: Create index parts for sorting
# This determines the sort order between siblings
_INDEX_PART = tuple[int, int, str]
sort_index_part: dict[str, _INDEX_PART] = {}
for key, item in report_items.items():
# First, sort by if this or a descendant has a cleaning_order
# None, then ok, then needs_cleaning, then both
has_ok_descendant = any(
ok_key.startswith(key) for ok_key in clean_group_keys["ok"]
)
has_nc_descendant = any(
nc_key.startswith(key) for nc_key in clean_group_keys["needs_cleaning"]
)
cleaning_order = (1 if has_ok_descendant else 0) + (
2 if has_nc_descendant else 0
)
# Next, sort by negation
# No negation then negation
if _KEY_SEP in key:
key_part = key.rsplit(_KEY_SEP, 1)[1]
else:
key_part = key
if key_part.startswith(_NEGATE_PREFIX):
neg_order = 1
key_part = key_part[1:]
else:
neg_order = 0
# Finally, sort by name
sort_index_part[key] = (cleaning_order, neg_order, key_part)
# Pass 3: Create the sort index
# Short paths come before long paths
# Next, use the index part to sort
_INDEX_FULL = tuple[int, int, tuple[_INDEX_PART, ...]]
sort_index: dict[str, _INDEX_FULL] = {}
for key in report_items.keys():
key_parts = key.split(_KEY_SEP)
index_parts: list[_INDEX_PART] = []
model_index = -1
while key_parts:
if len(key_parts) == 1:
model_index = model_keys.index(key_parts[0])
subkey = _KEY_SEP.join(key_parts)
index_parts.insert(0, sort_index_part[subkey])
key_parts.pop()
sort_index[key] = (model_index, len(index_parts), tuple(index_parts))
# Pass 4: Return the sort dict of ReportEntries
def get_sort_index(key: str) -> _INDEX_FULL:
return sort_index[key]
reports: dict[str, ReportEntry] = {}
for key in sorted(report_items, key=get_sort_index):
item = report_items[key]
reports[key] = ReportEntry(
item, count_by_key[key], key.count(_KEY_SEP) + 1, []
)
if _KEY_SEP in key:
parent_key = key.rsplit(_KEY_SEP, 1)[0]
reports[parent_key].child_keys.append(key)
return reports