in privaterelay/cleaner_task.py [0:0]
def _get_counts_and_data(self) -> tuple[Counts, CleanupData]:
if not self.data_items:
return {}, {}
counts: Counts = {"summary": {"ok": 0, "needs_cleaning": 0}}
cleanup_data: CleanupData = {}
for name, data_item in self.data_items.items():
if not data_item.metric_name:
continue
count = data_item.count()
if isinstance(data_item, DataModelItem):
counts[name] = {"all": count}
else:
model_name = name.split(".")[0]
counts[model_name][data_item.metric_name] = count
if isinstance(data_item, DataItem) and data_item.clean_group:
counts["summary"][data_item.clean_group] += count
if data_item.clean_group == "needs_cleaning":
cleanup_data[model_name] = name
return counts, cleanup_data