privaterelay/management/commands/aggregate_generated_for.py (106 lines of code) (raw):
from argparse import ArgumentParser
from collections.abc import Iterable
from csv import DictReader, DictWriter
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from django.core.management.base import BaseCommand, CommandError
def normalize(url: str) -> str:
"""
The url in data may not have // which urlparse requires to recognize the netloc
script from:
https://stackoverflow.com/questions/53816559/python-3-netloc-value-in-urllib-parse-is-empty-if-url-doesnt-have
"""
if not (
url.startswith("//") or url.startswith("http://") or url.startswith("https://")
):
return "//" + url
return url
def aggregate_by_generated_for(
file_path: str, data: Iterable[dict[str, Any]]
) -> dict[str, dict[str, int]]:
aggregate_usage: dict[str, dict[str, int]] = {}
columns = [
"count", # Number of masks with the generated_for
"total_usage", # Sum of emails forwarded, emails blocked,
# trackers blocked in emails, emails replied, and spam blocked
"total_forwarded", # Total emails forwarded to masks
"total_blocked", # Total emails blocked for masks
"total_level_one_trackers_blocked", # Total number of trackers
# blocked in emails forwarded to masks
"total_replied", # Total number of emails replied to masks
"total_spam", # Total number of spam
]
for row in data:
aggregate_data: dict[str, int] = {
"count": 0,
"row_count": 0,
"total_usage": 0,
"total_forwarded": 0,
"total_blocked": 0,
"total_level_one_trackers_blocked": 0,
"total_replied": 0,
"total_spam": 0,
}
url = row["generated_for"]
# TODO: good candidate for a unit-tested function
# clean the domain for multiple domains in generated_for
# separated by space, strip www, and others like stripping path
if url:
normalized_url = normalize(url)
domain = urlparse(normalized_url).netloc
else:
domain = url
if domain in aggregate_usage:
aggregate_data = aggregate_usage[domain]
aggregate_data["row_count"] = aggregate_data["row_count"] + 1
for col in columns:
d = int(row[col])
aggregate_data[col] += d
aggregate_usage[domain] = aggregate_data
return aggregate_usage
def generate_csv_file(
file_path: str, aggregate_usage: dict[str, dict[str, Any]]
) -> Path:
aggregate_file_path = Path(file_path).parent.joinpath("aggregate.csv")
with open(aggregate_file_path, "w", newline="") as csvfile:
field_names = [
"domain",
"rank",
"count",
"row_count",
"total_usage",
"ratio_usage",
"total_forwarded",
"ratio_forwarded",
"total_blocked",
"ratio_blocked",
"total_level_one_trackers_blocked",
"ratio_level_one_trackers_blocked",
"total_replied",
"ratio_replied",
"total_spam",
"ratio_spam",
]
writer = DictWriter(csvfile, fieldnames=field_names)
writer.writeheader()
for k, v in aggregate_usage.items():
row = {"domain": k}
row.update(v)
writer.writerow(row)
return aggregate_file_path
class Command(BaseCommand):
help = (
"Takes CSV file with generated_for values and "
"normalizes URLs in domain column and aggregates the values. "
"Creates or updates aggregate.csv for quarterly mask acceptance testing. "
"See instructions on how to get generated_for CSV file on MPP-3825."
)
def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument(
"--path",
type=str,
required=True,
help="Path to the CSV file to normalize and aggregate",
)
def handle(self, *args: Any, **options: Any) -> str:
file_path: str = options.get("path", "")
if file_path == "":
raise CommandError(
"Aggregate generated_for failed: File path must be entered"
)
with open(file_path, newline="") as csvfile:
datareader = DictReader(csvfile, delimiter=",", quotechar="|")
aggregate_usage = aggregate_by_generated_for(file_path, datareader)
aggregate_file_path = generate_csv_file(file_path, aggregate_usage)
return f"Completed updates to {aggregate_file_path}"