def get_report_entries()

in privaterelay/cleaner_task.py [0:0]


    def get_report_entries(self) -> dict[str, ReportEntry]:
        """
        Return an ordered dict of ReportEntries and related data.

        The key of the returned dict is a dotted path representing the path to the top
        of the hierarchy. If the ReportEntry is a DataModelItem or DataItem, it is the
        same key as used in `data_items`.

        The value of the returned dict is a 4-element tuple:
        - The ReportEntry, which may be a DataModelItem or DataItem
        - The count at this level
        - The key of the parent to this entry, or '' if a top element
        - A list of keys of the child elements of the entry, in sorted order

        Any DataModelItem or DataItem with a blank `metric_name` is omitted. Additional
        ReportEntry items may be added, for example to represent the cleaned data.
        """
        # Pass 1: Gather report items and sorting data.
        # The sorting data:
        # clean_group_keys - what DataItems have .clean_group set
        # count_by_key - the pre-computed count from .counts
        # model_keys - the order models appeared in .data_specification
        # report_items - the items that have a .metric_name
        clean_group_keys: dict[CLEAN_GROUP_T, set[str]] = {
            "ok": set(),
            "needs_cleaning": set(),
        }
        count_by_key: dict[str, int] = {}
        model_keys: list[str] = []
        report_items: dict[str, ReportItem] = {}
        for key, data_item in self.data_items.items():
            if not (metric_name := data_item.metric_name):
                continue
            report_items[key] = data_item
            if _KEY_SEP in key:
                # Handle DataItem
                if not isinstance(data_item, DataItem):  # pragma: no cover
                    raise Exception(
                        f"For key '{key}', expected a DataItem, got {data_item!r}"
                    )
                model_key, _ = key.split(_KEY_SEP, 1)
                count_by_key[key] = self.counts[model_key][metric_name]
                if data_item.clean_group:
                    clean_group_keys[data_item.clean_group].add(key)
            else:
                # Handle DataModelItem
                if not isinstance(data_item, DataModelItem):  # pragma: no cover
                    raise Exception(
                        f"For key '{key}', expected a DataModelItem, got {data_item!r}"
                    )
                model_keys.append(key)
                count_by_key[key] = self.counts[key]["all"]

        # Pass 1.2: Created CleanedItem records
        for key in clean_group_keys["needs_cleaning"]:
            model_key, _ = key.split(_KEY_SEP, 1)
            try:
                clean_count = self.counts[model_key][_CLEANED_METRIC_NAME]
            except KeyError:
                continue
            clean_item = CleanedItem(
                clean_count, report_name=self._cleaned_report_name[model_key]
            )
            clean_key = f"{key}{_KEY_SEP}cleaned"
            count_by_key[clean_key] = clean_count
            report_items[clean_key] = clean_item

        # Pass 2: Create index parts for sorting
        # This determines the sort order between siblings
        _INDEX_PART = tuple[int, int, str]
        sort_index_part: dict[str, _INDEX_PART] = {}
        for key, item in report_items.items():
            # First, sort by if this or a descendant has a cleaning_order
            # None, then ok, then needs_cleaning, then both
            has_ok_descendant = any(
                ok_key.startswith(key) for ok_key in clean_group_keys["ok"]
            )
            has_nc_descendant = any(
                nc_key.startswith(key) for nc_key in clean_group_keys["needs_cleaning"]
            )
            cleaning_order = (1 if has_ok_descendant else 0) + (
                2 if has_nc_descendant else 0
            )

            # Next, sort by negation
            # No negation then negation
            if _KEY_SEP in key:
                key_part = key.rsplit(_KEY_SEP, 1)[1]
            else:
                key_part = key
            if key_part.startswith(_NEGATE_PREFIX):
                neg_order = 1
                key_part = key_part[1:]
            else:
                neg_order = 0

            # Finally, sort by name
            sort_index_part[key] = (cleaning_order, neg_order, key_part)

        # Pass 3: Create the sort index
        # Short paths come before long paths
        # Next, use the index part to sort
        _INDEX_FULL = tuple[int, int, tuple[_INDEX_PART, ...]]
        sort_index: dict[str, _INDEX_FULL] = {}
        for key in report_items.keys():
            key_parts = key.split(_KEY_SEP)
            index_parts: list[_INDEX_PART] = []
            model_index = -1
            while key_parts:
                if len(key_parts) == 1:
                    model_index = model_keys.index(key_parts[0])
                subkey = _KEY_SEP.join(key_parts)
                index_parts.insert(0, sort_index_part[subkey])
                key_parts.pop()
            sort_index[key] = (model_index, len(index_parts), tuple(index_parts))

        # Pass 4: Return the sort dict of ReportEntries
        def get_sort_index(key: str) -> _INDEX_FULL:
            return sort_index[key]

        reports: dict[str, ReportEntry] = {}
        for key in sorted(report_items, key=get_sort_index):
            item = report_items[key]
            reports[key] = ReportEntry(
                item, count_by_key[key], key.count(_KEY_SEP) + 1, []
            )
            if _KEY_SEP in key:
                parent_key = key.rsplit(_KEY_SEP, 1)[0]
                reports[parent_key].child_keys.append(key)
        return reports