bot/tools/validator.py (111 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import argparse
import collections
import json
import logging
import os.path
import sys
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
Field = collections.namedtuple("Field", "name, validators, required")
def validate_path(value):
assert isinstance(value, str), "Path should be a string"
assert len(value) > 0, "Path should not be empty"
assert not os.path.isabs(value), "Path should not be absolute"
logger.debug(f"Path {value} is valid")
def validate_all_types(iterable, t):
return all(map(lambda k: isinstance(k, t), iterable))
def validate_positive_int(value):
if isinstance(value, int):
assert value >= 0, "must be a positive integer (>= 0)"
elif value is None:
logger.debug("Found null value instead of positive integer")
else:
raise Exception("must be either a positive integer or null")
def validate_string(value):
assert isinstance(value, str), "must be a string"
assert len(value) > 0, "must be a non-empty string"
def validate_levels(value, levels=("warning", "error")):
assert value in levels, "must be in {}".format(", ".join(levels))
FIELDS = (
# Required fields
Field("path", [validate_path], True),
Field("line", [validate_positive_int], True),
Field("column", [validate_positive_int], True),
Field("level", [validate_string, validate_levels], True),
Field("message", [validate_string], True),
# Optional fields
Field("nb_lines", [validate_positive_int], False),
Field("analyzer", [validate_string], False),
Field("check", [validate_string], False),
)
def validate_issue(payload):
"""Validate an issue payload"""
assert isinstance(payload, dict), "Issue must be a dict"
# Check all required keys are here
keys = set(payload.keys())
required = {field.name for field in FIELDS if field.required is True}
diff = required.difference(keys)
if diff:
raise Exception("Missing required keys {}".format(", ".join(sorted(diff))))
# Check no extra keys is set
diff = keys.difference(field.name for field in FIELDS)
if diff:
logger.warning(
"Extra fields will not be used: {}".format(", ".join(sorted(diff)))
)
# Validate all fields one by one
for field in FIELDS:
if field.name in payload:
logger.debug(f"Validating field {field.name}")
for validator in field.validators:
try:
validator(payload[field.name])
except Exception as e:
raise Exception(f"{field.name} {e}")
else:
if field.required is True:
raise Exception(f"Missing required field {field.name}")
logger.debug(f"Missing optional field {field.name}")
return True
def validate(payload):
"""Validate an issues file payload"""
# Top level must be a dict
assert isinstance(payload, dict), "Top structure is not a dict"
# All keys must be str
assert validate_all_types(payload.keys(), str), "All top keys must be strings"
# All values must be lists
assert validate_all_types(payload.values(), list), "All top values must be lists"
for path, issues in payload.items():
logger.debug(f"Validating section {path}")
validate_path(path)
# All issues must be dicts
assert validate_all_types(issues, dict), f"All issues for {path} must be dicts"
# Validate all issues
for i, issue in enumerate(issues):
logger.debug(f"Validating issue n°{i + 1} for {path}")
try:
validate_issue(issue)
# Check the top path is the same in the issue
assert (
path == issue["path"]
), "Top path and issue path must be identical ({} != {})".format(
path, issue["path"]
)
except Exception as e:
raise Exception(f"Invalid issue n°{i + 1} for {path} : {e}")
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"issues_file",
help="Local path to a JSON payload containing issues detected",
type=open,
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
default=False,
help="Display detailed log lines",
)
args = parser.parse_args()
if args.verbose is True:
logger.setLevel(logging.DEBUG)
logger.debug("Enabled debug output")
try:
payload = json.load(args.issues_file)
validate(payload)
except json.decoder.JSONDecodeError as e:
logger.error(f"Invalid JSON payload: {e}", exc_info=True)
sys.exit(1)
except Exception as e:
logger.error(f"Invalid issues format: {e}", exc_info=True)
sys.exit(1)
logger.info("Your file is valid !")