in mozilla_schema_generator/subset_pings.py [0:0]
def generate(config_data, out_dir: Path) -> Dict[str, Dict[str, Dict[str, Dict]]]:
"""Read in pings from disk and split fields into new subset pings.
If configured, also produce a remainder ping with all the fields that weren't moved.
"""
schemas = defaultdict(lambda: defaultdict(dict))
# read in pings and split them according to config
for source in config_data:
src_namespace, src_doctype, src_version = _target_as_tuple(source)
src_path = _get_path(out_dir, src_namespace, src_doctype, src_version)
schema = json.loads(src_path.read_text())
config = schema["mozPipelineMetadata"].pop("split_config")
for subset_config in config["subsets"]:
dst_namespace, dst_doctype, dst_version = _target_as_tuple(subset_config)
pattern = re.compile(subset_config["pattern"])
subset = _schema_copy(schema, pattern, delete=True)
assert subset is not None, "Subset pattern matched no paths"
if "extra_pattern" in subset_config:
# match paths where the schema must be present in the remainder because
# schemas cannot delete fields, but data must only go to the subset.
pattern = re.compile(subset_config["extra_pattern"])
subset = _schema_copy(schema, pattern, dst=subset, delete=False)
assert subset is not None, "Subset extra_pattern matched no paths"
_copy_metadata(schema, subset)
_update_pipeline_metadata(subset, dst_namespace, dst_doctype, dst_version)
schemas[dst_namespace][dst_doctype][dst_version] = subset
remainder_config = config.get("remainder")
if remainder_config:
dst_namespace, dst_doctype, dst_version = _target_as_tuple(remainder_config)
# no need to copy metadata
_update_pipeline_metadata(schema, dst_namespace, dst_doctype, dst_version)
schemas[dst_namespace][dst_doctype][dst_version] = schema
return schemas