auto_sizing/export_json.py (140 lines of code) (raw):
import json
import logging
import re
from pathlib import Path
from typing import Any
import google.cloud.storage as storage
import toml
from mozilla_nimbus_schemas.jetstream import SampleSizes, SizingRecipe
logger = logging.getLogger(__name__)
SAMPLE_SIZE_PATH = "sample_sizes"
DATA_DIR = Path(__file__).parent / "data"
RUN_MANIFEST = DATA_DIR / "manifest.toml"
ARGO_PREFIX = "argo_target"
def bq_normalize_name(name: str) -> str:
return re.sub(r"[^a-zA-Z0-9_]", "_", name)
def _upload_str_to_gcs(
project_id: str,
bucket_name: str,
target_slug: str,
base_name: str,
str_to_upload: str,
) -> None:
storage_client = storage.Client(project_id)
bucket = storage_client.get_bucket(bucket_name)
target_file_prefix = base_name.split("/")[0]
target_file = f"{target_file_prefix}_{bq_normalize_name(target_slug)}"
target_path = base_name
blob = bucket.blob(f"{target_path}/{target_file}.json")
logger.info(f"Uploading {target_file} to {bucket_name}/{target_path}")
blob.upload_from_string(
data=str_to_upload,
content_type="application/json",
)
def export_sample_size_json(
project_id: str,
bucket_name: str,
target_slug: str,
sample_size_result: str,
current_date: str,
) -> None:
"""Export sample sizes to GCS bucket."""
if ARGO_PREFIX in target_slug:
_upload_str_to_gcs(
project_id,
bucket_name,
target_slug,
f"{SAMPLE_SIZE_PATH}/ind_target_results_{current_date}",
sample_size_result,
)
else:
_upload_str_to_gcs(
project_id,
bucket_name,
target_slug,
SAMPLE_SIZE_PATH,
sample_size_result,
)
def parse_recipe_from_slug(target_slug: str) -> SizingRecipe:
jobs_dict = toml.load(RUN_MANIFEST)
print(jobs_dict)
# assert False
# parse out recipe fields
target_recipe = jobs_dict[target_slug]
recipe = json.loads(target_recipe["target_recipe"])
app_id = target_recipe.get("app_id")
channel = recipe.get("release_channel")
locale = recipe.get("locale")
country = recipe.get("country")
new_or_existing = recipe.get("user_type")
recipe_info = {
"app_id": app_id,
"channel": channel,
"locale": locale,
"country": country,
"new_or_existing": new_or_existing,
}
return recipe_info
def build_target_key_from_recipe(recipe_info: SizingRecipe) -> str:
# parse out recipe fields
app_id = recipe_info.get("app_id")
channel = recipe_info.get("channel")
locale = recipe_info.get("locale")
country = recipe_info.get("country")
# target_key should be an easy lookup for relevant sizing
# {app_id}:{channel}:{locale}:{country}
target_key = f"{app_id}"
if channel:
target_key += f":{channel}"
if locale:
eval_locale = eval(locale)
sorted_locale = sorted(eval_locale) if type(eval_locale) is tuple else [eval_locale]
# string representation of list includes spaces between elements
target_key += f":{sorted_locale}".replace(" ", "")
if country:
target_key += f":{country}"
return target_key
def aggregate_results(
project_id: str,
bucket_name: str,
today,
) -> SampleSizes:
storage_client = storage.Client(project_id)
agg_json: dict[str, dict[str, Any]] = {}
target_results_filename_pattern = rf"[\S*]({ARGO_PREFIX}_\d*).json"
for blob in storage_client.list_blobs(
bucket_name, prefix=f"{SAMPLE_SIZE_PATH}/ind_target_results_{today}"
):
# For files in the bucket, check if file name matches `target_\d.json` pattern
regexp_result = re.search(target_results_filename_pattern, blob.name)
if regexp_result:
target_slug = regexp_result.group(1)
data = blob.download_as_string()
recipe_info = parse_recipe_from_slug(target_slug)
results = {
"target_recipe": recipe_info,
"sample_sizes": json.loads(data),
}
target_key = build_target_key_from_recipe(recipe_info)
new_or_existing = recipe_info.get("new_or_existing")
if target_key not in agg_json:
agg_json[target_key] = {}
agg_json[target_key][new_or_existing] = results
# validate json before export
sizing_results = SampleSizes.parse_obj(agg_json) if agg_json is not None else {}
return sizing_results
def upload_aggregate_json(
project_id: str,
bucket_name: str,
results: SampleSizes,
today: str,
):
file_name = f"auto_sizing_results_{today}"
sizing_json = results.json()
_upload_str_to_gcs(project_id, bucket_name, file_name, SAMPLE_SIZE_PATH, sizing_json)
file_name_latest = "auto_sizing_results_latest"
_upload_str_to_gcs(
project_id,
bucket_name,
file_name_latest,
SAMPLE_SIZE_PATH,
sizing_json,
)
def aggregate_and_reupload(
project_id: str,
bucket_name: str,
run_date: str,
) -> None:
sizing_results = aggregate_results(project_id, bucket_name, run_date)
upload_aggregate_json(project_id, bucket_name, sizing_results, run_date)