mozilla_schema_generator/subset_pings.py (87 lines of code) (raw):

import json import re from collections import defaultdict from copy import deepcopy from pathlib import Path from typing import Dict, Tuple # most metadata fields are added to the bq schema directly and left out of the json schema, but # fields here appear in the json schema and must be explicitly included in all resulting pings ADDITIONAL_METADATA_FIELDS = [ "client_id", "clientId", "client_info", ] def _get_path(out_dir, namespace, doctype, version): return out_dir / namespace / doctype / f"{doctype}.{version}.schema.json" def _path_string(*path): return ".".join(path) def _schema_copy(src, pattern, dst=None, delete=True, prefix=()): if src.get("type") != "object" or "properties" not in src: # only recurse into objects with explicitly defined properties return None src_props = src["properties"] dst_props = {} for name, src_subschema in list(src_props.items()): path = ".".join((*prefix, name)) if pattern.fullmatch(path): prop = src_props.pop(name) if delete else deepcopy(src_props[name]) else: prop = _schema_copy( src_subschema, pattern, dst=None if dst is None else dst["properties"].get(name, None), delete=delete, prefix=(*prefix, name), ) if prop is not None: dst_props[name] = prop if dst_props: if dst is None: return {"properties": dst_props, "type": "object"} else: dst["properties"].update(dst_props) return dst return None def _copy_metadata(source, destination): for key in ("$id", "$schema", "mozPipelineMetadata"): if key not in source: continue elif isinstance(source[key], dict): destination[key] = deepcopy(source[key]) else: destination[key] = source[key] for key in ADDITIONAL_METADATA_FIELDS: if key in source["properties"]: destination["properties"][key] = deepcopy(source["properties"][key]) def _update_pipeline_metadata(schema, namespace, doctype, version): pipeline_metadata = schema["mozPipelineMetadata"] pipeline_metadata["bq_dataset_family"] = namespace pipeline_metadata["bq_table"] = f'{doctype.replace("-", "_")}_v{version}' def _target_as_tuple(target: Dict[str, str]) -> Tuple[str, str, str]: return ( target["document_namespace"], target["document_type"], target["document_version"], ) def generate(config_data, out_dir: Path) -> Dict[str, Dict[str, Dict[str, Dict]]]: """Read in pings from disk and split fields into new subset pings. If configured, also produce a remainder ping with all the fields that weren't moved. """ schemas = defaultdict(lambda: defaultdict(dict)) # read in pings and split them according to config for source in config_data: src_namespace, src_doctype, src_version = _target_as_tuple(source) src_path = _get_path(out_dir, src_namespace, src_doctype, src_version) schema = json.loads(src_path.read_text()) config = schema["mozPipelineMetadata"].pop("split_config") for subset_config in config["subsets"]: dst_namespace, dst_doctype, dst_version = _target_as_tuple(subset_config) pattern = re.compile(subset_config["pattern"]) subset = _schema_copy(schema, pattern, delete=True) assert subset is not None, "Subset pattern matched no paths" if "extra_pattern" in subset_config: # match paths where the schema must be present in the remainder because # schemas cannot delete fields, but data must only go to the subset. pattern = re.compile(subset_config["extra_pattern"]) subset = _schema_copy(schema, pattern, dst=subset, delete=False) assert subset is not None, "Subset extra_pattern matched no paths" _copy_metadata(schema, subset) _update_pipeline_metadata(subset, dst_namespace, dst_doctype, dst_version) schemas[dst_namespace][dst_doctype][dst_version] = subset remainder_config = config.get("remainder") if remainder_config: dst_namespace, dst_doctype, dst_version = _target_as_tuple(remainder_config) # no need to copy metadata _update_pipeline_metadata(schema, dst_namespace, dst_doctype, dst_version) schemas[dst_namespace][dst_doctype][dst_version] = schema return schemas