scripts/validate.py (223 lines of code) (raw):
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import json
import sys
from pathlib import Path
import jsonschema
from jsonschema.exceptions import best_match, ValidationError
from pyjexl.jexl import JEXL
class NestedRefResolver(jsonschema.RefResolver):
"""A custom ref resolver that handles bundled schema.
This is the resolver used by Experimenter.
"""
def __init__(self, schema):
super().__init__(base_uri=None, referrer=None)
if "$id" in schema:
self.store[schema["$id"]] = schema
if "$defs" in schema:
for dfn in schema["$defs"].values():
if "$id" in dfn:
self.store[dfn["$id"]] = dfn
# Create a jexl evaluator
EVALUATOR = JEXL()
EVALUATOR.add_binary_operator("intersect", 40, lambda x, y: set(x).intersection(y))
EVALUATOR.add_transform("bucketSample", lambda x, y, z, q: False)
EVALUATOR.add_transform("date", lambda x: 0)
EVALUATOR.add_transform("keys", lambda x: [])
EVALUATOR.add_transform("length", lambda x: 0)
EVALUATOR.add_transform("mapToProperty", lambda x, y: [])
EVALUATOR.add_transform("preferenceExists", lambda x: False)
EVALUATOR.add_transform("preferenceIsUserSet", lambda x: False)
EVALUATOR.add_transform("preferenceValue", lambda x: False)
EVALUATOR.add_transform("regExpMatch", lambda x: False)
EVALUATOR.add_transform("stableSample", lambda x, y: False)
EVALUATOR.add_transform("versionCompare", lambda x: 0)
# cache all the known schemas to validate experiments
ALL_SCHEMAS = dict()
SCHEMA_MAP = {
"message": Path("schema", "MessagingExperiment.schema.json"),
"experiment": Path("schema", "NimbusExperiment.schema.json"),
"action": Path("schema", "SpecialMessageActionSchemas.json"),
"message-groups": Path("schema", "message-groups.schema.json"),
}
USAGE = """
Usage:
validate.py TYPE PATH
Where TYPE should be one of "message", "experiment", or "message-groups".
Exmaple:
validate.py message ./outgoing/cfr.json
"""
def load_schema(name):
"""Load a schema and cache it."""
if name in ALL_SCHEMAS:
return ALL_SCHEMAS[name]
with SCHEMA_MAP[name].open("r") as f:
schema = json.load(f)
if name == "experiment":
# The new schema has a title "DesktopNimbusExperiment" instead of a self-ref.
if "DesktopNimbusExperiment" in schema:
schema = schema["DesktopNimbusExperiment"]
ALL_SCHEMAS[name] = schema
return ALL_SCHEMAS[name]
def validate_item_targeting(item, for_exp=False):
if "targeting" not in item and "filter_expression" not in item:
return
indentation = "\t" if for_exp else ""
print("{}Validate targeting {}".format(indentation, item["id"]))
for key in ["targeting", "filter_expression"]:
jexl_expression = item.get(key)
if jexl_expression is None:
continue
try:
result = list(EVALUATOR.validate(jexl_expression))
if len(result) > 0:
raise SyntaxError(result[0])
except SyntaxError as e:
print(e)
sys.exit(1)
def validate_action(action, for_exp):
"""Validate a special message action"""
indentation = "\t" if for_exp else ""
schema = load_schema("action")
print("{}Validate message action {}".format(indentation, action["type"]))
try:
jsonschema.validate(instance=action, schema=schema)
except ValidationError as err:
match = best_match([err])
print("{}Validation error: {}".format(indentation, match.message))
sys.exit(1)
def extract_actions(message):
"""Extract all of the special message actions from the given message."""
message_type = message["template"]
def _extract_cfr_doorhanger():
buttons_prop = message["content"].get("buttons", {})
if type(buttons_prop) is list:
for button in buttons_prop:
yield button["action"]
else:
for name, button in buttons_prop.items():
if name == "primary":
yield button["action"]
else:
for sub_button in button:
if "action" in sub_button:
yield sub_button["action"]
def _extract_cfr_urlbar_chiclet():
yield message["content"]["action"]
def _extract_infobar():
for button in message["buttons"]:
yield button["action"]
def _extract_spotlight():
template = message["content"].get("template", "logo-and-content")
if template == "logo-and-content":
yield message["content"]["body"]["primary"]["action"]
yield message["content"]["body"]["secondary"]["action"]
elif template == "multistage":
for screen in message["content"]["screens"]:
for button_name in ["primary_button", "secondary_button"]:
button = screen["content"].get(button_name)
if button and button["action"].get("type"):
yield button["action"]
def _extract_toolbar_badge():
if "action" in message["content"]:
yield message["content"]["action"]
def _extract_pb_newtab():
if "promoButton" in message["content"]:
yield message["content"]["promoButton"]["action"]
extractors = {
"cfr_doorhanger": _extract_cfr_doorhanger,
"cfr_urlbar_chiclet": _extract_cfr_urlbar_chiclet,
"infobar": _extract_infobar,
"spotlight": _extract_spotlight,
"toolbar_badge": _extract_toolbar_badge,
"pb_newtab": _extract_pb_newtab,
}
try:
yield from extractors[message_type]()
except KeyError:
return []
def validate_all_actions(message, for_exp=False):
"""Validate all actions in the given message."""
for action in extract_actions(message):
validate_action(action, for_exp)
def get_branch_message(branch):
"""Return the message from an experiment branch."""
# TODO: This does not support multi-feature experiments.
feature = branch["feature"]
feature_id = feature["featureId"]
if feature_id == "cfr":
value = feature["value"]
if value is not None and "id" in value:
return "cfr", feature["value"]
return "cfr", None
elif feature_id == "aboutwelcome":
value = feature["value"]
if value is None:
return "onboarding-multistage", None
elif "cards" in value:
return "onboarding", value["cards"]
elif "screens" in value:
return "onboarding-multistage", value
else:
return "onboarding", None
elif feature_id == "moments-page":
if "id" in feature["value"]:
return "moments-page", feature["value"]
return "moments-page", None
else:
return None, None
def validate_experiment_message_id(exp_slug, branch):
"""This validation enforces certain naming convention for some message
types such as CFR in order to support the automated analysis feature of
Jetstream.
"""
message_type, branch_message = get_branch_message(branch)
if branch_message is None:
return
if message_type == "cfr":
print(f"\tValidate experiment message ID for branch {branch['slug']}")
assert branch_message["id"] == f"{exp_slug}:{branch['slug']}", (
f"Invalid CFR message ID {branch_message['id']}, "
f"it should be named as {{experiment-slug}}:{{branch-slug}}"
)
assert (
branch_message["content"]["bucket_id"] == f"{exp_slug}:{branch['slug']}"
), (
f"Invalid CFR bucket_id {branch_message['content']['bucket_id']}, "
f"it should be named as {{experiment-slug}}:{{branch-slug}}"
)
def validate_experiment(item):
for branch in item.get("branches"):
message_type, branch_message = get_branch_message(branch)
if branch_message is None:
print("\tSkip branch {} because it's empty".format(branch.get("slug")))
continue
schema = load_schema(message_type)
try:
branch_message_list = (
branch_message if isinstance(branch_message, list) else [branch_message]
)
for message in branch_message_list:
jsonschema.validate(instance=message, schema=schema)
print(
"\tValidate {} with schema {}".format(branch.get("slug"), message_type)
)
except ValidationError as err:
match = best_match([err])
print("\tValidation error: {}".format(match.message))
sys.exit(1)
# Validate the targeting JEXL if any
validate_item_targeting(branch_message, True)
# Validate all the message actions
validate_all_actions(branch_message, True)
# Validate the message_id naming
validate_experiment_message_id(item["slug"], branch)
def validate_message(message):
"""Validate the components of a message"""
validate_item_targeting(message)
validate_all_actions(message)
def validate(schema_name, src_path):
schema = load_schema(schema_name)
resolver = NestedRefResolver(schema)
print(f"Validating {src_path} with {schema_name} schema...")
with open(src_path, "r") as f:
items = json.load(f)
if items is not None:
for item in items:
print(f"Valiating schema for {item['id']}")
try:
jsonschema.validate(
instance=item,
schema=schema,
resolver=resolver,
)
except ValidationError as err:
match = best_match([err])
print(f"Validation error: {match.message}")
sys.exit(1)
if schema_name == "message":
validate_message(item)
elif schema_name == "experiment":
validate_experiment(item)
elif schema_name == "message-group":
pass
elif schema_name == "action":
pass
print("PASS")
if __name__ == "__main__":
if len(sys.argv) < 3:
print(USAGE)
sys.exit(1)
validate(sys.argv[1], sys.argv[2])