mozilla_schema_generator/generic_ping.py (126 lines of code) (raw):

# -*- coding: utf-8 -*- # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import json import logging import os import pathlib import re from json.decoder import JSONDecodeError from typing import Dict, List import requests from .config import Config from .probes import Probe from .schema import Schema, SchemaException logger = logging.getLogger(__name__) class GenericPing(object): probe_info_base_url = "https://probeinfo.telemetry.mozilla.org" default_encoding = "utf-8" default_max_size = 12900 # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633 cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache")) def __init__(self, schema_url, env_url, probes_url, mps_branch="main"): self.branch_name = mps_branch self.schema_url = schema_url.format(branch=self.branch_name) self.env_url = env_url.format(branch=self.branch_name) self.probes_url = probes_url def get_schema(self) -> Schema: return Schema(self._get_json(self.schema_url)) def get_env(self) -> Schema: return Schema(self._get_json(self.env_url)) def get_probes(self) -> List[Probe]: return [ Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items() ] def generate_schema( self, config: Config, *, max_size: int = None ) -> Dict[str, Schema]: schema = self.get_schema() env = self.get_env() probes = self.get_probes() if max_size is None: max_size = self.default_max_size if env.get_size() >= max_size: raise SchemaException( "Environment must be smaller than max_size {}".format(max_size) ) if schema.get_size() >= max_size: raise SchemaException( "Schema must be smaller than max_size {}".format(max_size) ) schemas = {config.name: self.make_schema(schema, probes, config, max_size)} if any(schema.get_size() > max_size for schema in schemas.values()): raise SchemaException( "Schema must be smaller or equal max_size {}".format(max_size) ) return schemas @staticmethod def make_schema( env: Schema, probes: List[Probe], config: Config, max_size: int ) -> Schema: """ Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else. """ schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) schema = env.clone() for schema_key, probe in schema_elements: try: addtlProps = env.get(schema_key + ("additionalProperties",)) except KeyError: addtlProps = None probe_schema = Schema(probe.get_schema(addtlProps)).clone() schema.set_schema_elem( schema_key + ("properties", probe.name), probe_schema.schema ) # Remove all additionalProperties (#22) for key in config.get_match_keys(): try: schema.delete_group_from_schema( key + ("propertyNames",), propagate=False ) except KeyError: pass try: schema.delete_group_from_schema( key + ("additionalProperties",), propagate=True ) except KeyError: pass return schema @staticmethod def _slugify(text: str) -> str: """Get a valid slug from an arbitrary string""" value = re.sub(r"[^\w\s-]", "", text.lower()).strip() return re.sub(r"[-\s]+", "-", value) @staticmethod def _present_in_cache(url: str) -> bool: return (GenericPing.cache_dir / GenericPing._slugify(url)).exists() @staticmethod def _add_to_cache(url: str, val: str): GenericPing.cache_dir.mkdir(parents=True, exist_ok=True) cache_file = GenericPing.cache_dir / GenericPing._slugify(url) # protect against multiple writers to the cache: # https://github.com/mozilla/mozilla-schema-generator/pull/210 try: with open(cache_file, "x") as f: f.write(val) except FileExistsError: pass @staticmethod def _retrieve_from_cache(url: str) -> str: return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text() @staticmethod def _get_json_str(url: str) -> str: if GenericPing._present_in_cache(url): return GenericPing._retrieve_from_cache(url) headers = {} if url.startswith(GenericPing.probe_info_base_url): # For probe-info-service requests, set the cache-control header to force # google cloud cdn to bypass the cache headers["Cache-Control"] = "no-cache" r = requests.get(url, headers=headers, stream=True) r.raise_for_status() json_bytes = b"" try: for chunk in r.iter_content(chunk_size=1024): if chunk: json_bytes += chunk except ValueError as e: raise ValueError("Could not parse " + url) from e final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding) GenericPing._add_to_cache(url, final_json) return final_json @staticmethod def _get_json(url: str) -> dict: try: return json.loads(GenericPing._get_json_str(url)) except JSONDecodeError: logging.error("Unable to process JSON for url: %s", url) raise