sync/bugcomponents.py (167 lines of code) (raw):
import re
import os
from ast import literal_eval
from collections import defaultdict
import newrelic
from . import log
from .env import Environment
from .projectutil import Mach
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union
from git.repo.base import Repo
logger = log.get_logger(__name__)
env = Environment()
# Copied from mozpack.path
re_cache = {}
def match(path: str, pattern: str) -> bool:
'''
Return whether the given path matches the given pattern.
An asterisk can be used to match any string, including the null string, in
one part of the path:
'foo' matches '*', 'f*' or 'fo*o'
However, an asterisk matching a subdirectory may not match the null string:
'foo/bar' does *not* match 'foo/*/bar'
If the pattern matches one of the ancestor directories of the path, the
patch is considered matching:
'foo/bar' matches 'foo'
Two adjacent asterisks can be used to match files and zero or more
directories and subdirectories.
'foo/bar' matches 'foo/**/bar', or '**/bar'
'''
if not pattern:
return True
if pattern not in re_cache:
p = re.escape(pattern)
p = re.sub(r'(^|\\\/)\\\*\\\*\\\/', r'\1(?:.+/)?', p)
p = re.sub(r'(^|\\\/)\\\*\\\*$', r'(?:\1.+)?', p)
p = p.replace(r'\*', '[^/]*') + '(?:/.*)?$'
re_cache[pattern] = re.compile(p)
return re_cache[pattern].match(path) is not None
def remove_obsolete(path: str, moves: Optional[Dict[str, str]] = None) -> str:
from lib2to3 import (pygram, # type: ignore
pytree,
patcomp)
from lib2to3.pgen2 import driver
files_pattern = ("with_stmt< 'with' power< 'Files' "
"trailer< '(' arg=any any* ')' > any* > any* >")
base_dir = os.path.dirname(path) or "."
d = driver.Driver(pygram.python_grammar,
convert=pytree.convert)
tree = d.parse_file(path)
pc = patcomp.PatternCompiler()
pat = pc.compile_pattern(files_pattern)
unmatched_patterns = set()
node_patterns = {}
for node in tree.children:
match_values: Dict[Any, Any] = {}
if pat.match(node, match_values):
path_pat = literal_eval(match_values['arg'].value)
unmatched_patterns.add(path_pat)
node_patterns[path_pat] = (node, match_values)
for base_path, _, files in os.walk(base_dir):
for filename in files:
full_path = os.path.join(base_path, filename)
path = os.path.relpath(full_path, base_dir)
try:
assert ("../" not in path and
not path.endswith("/..")), "Path {} is outside {}".format(full_path,
base_dir)
except AssertionError:
newrelic.agent.record_exception(params={
"path": full_path
})
continue
if path[:2] == "./":
path = path[2:]
for pattern in unmatched_patterns.copy():
if match(path, pattern):
unmatched_patterns.remove(pattern)
if moves:
moved_patterns = compute_moves(moves, unmatched_patterns)
unmatched_patterns -= set(moved_patterns.keys())
for old_pattern, new_pattern in moved_patterns.items():
node, match_values = node_patterns[old_pattern]
arg = match_values["arg"]
arg.replace(arg.__class__(arg.type, '"%s"' % new_pattern))
for pattern in unmatched_patterns:
logger.debug("Removing %s" % pattern)
node_patterns[pattern][0].remove()
return str(tree)
def compute_moves(moves: Dict[str, str], unmatched_patterns: Set[str]) -> Dict[str, str]:
updated_patterns = {}
dest_paths = defaultdict(list)
for pattern in unmatched_patterns:
# Make things simpler by only considering patterns matching subtrees
# or single-file patterns
if "*" in pattern and not pattern.endswith("/**"):
continue
for from_path, to_path in moves.items():
if match(from_path, pattern):
dest_paths[pattern].append(to_path)
for pattern, paths in dest_paths.items():
if "*" not in pattern:
assert len(paths) == 1
updated_patterns[pattern] = paths[0]
elif pattern.endswith("/**"):
prefix = os.path.commonprefix(paths)
if not prefix:
continue
if not prefix.endswith("/"):
prefix = os.path.dirname(prefix)
if not prefix:
continue
prefix += "/"
updated_patterns[pattern] = prefix + "**"
return updated_patterns
def components_for_wpt_paths(git_gecko: Repo,
wpt_paths: Union[Set[str], Set[str]]) -> Mapping[str, List[str]]:
path_prefix = env.config["gecko"]["path"]["wpt"]
paths = [os.path.join(path_prefix, item) for item in wpt_paths]
mach = Mach(git_gecko.working_dir)
output = mach.file_info("bugzilla-component", *paths)
components: Mapping[str, List[str]] = defaultdict(list)
current = None
for line in output.split(b"\n"):
if line.startswith(b" "):
assert current is not None
path = line.strip().decode("utf8", "replace")
assert path.startswith(path_prefix)
wpt_path = os.path.relpath(path, path_prefix)
components[current].append(wpt_path)
else:
current = line.strip().decode("utf8")
return components
def get(git_gecko: Repo,
files_changed: Union[Set[str], Set[str]],
default: Tuple[str, str],
) -> Tuple[str, str]:
if not files_changed:
return default
components_dict = components_for_wpt_paths(git_gecko, files_changed)
if not components_dict:
return default
components = sorted(list(components_dict.items()), key=lambda x: -len(x[1]))
component = components[0][0]
if component == "UNKNOWN" and len(components) > 1:
component = components[1][0]
if component == "UNKNOWN":
return default
product, component = component.split(" :: ", 1)
return product, component
def mozbuild_path(repo_work: Repo) -> str:
working_dir = repo_work.working_dir
assert working_dir is not None
return os.path.join(working_dir,
env.config["gecko"]["path"]["wpt"],
os.pardir,
"moz.build")
def update(repo_work: Repo, renames: Dict[str, str]) -> None:
mozbuild_file_path = mozbuild_path(repo_work)
tests_base = os.path.split(env.config["gecko"]["path"]["wpt"])[1]
def tests_rel_path(path: str) -> str:
return os.path.join(tests_base, path)
mozbuild_rel_renames = {tests_rel_path(old): tests_rel_path(new)
for old, new in renames.items()}
if os.path.exists(mozbuild_file_path):
new_data = remove_obsolete(mozbuild_file_path,
moves=mozbuild_rel_renames)
with open(mozbuild_file_path, "w", encoding="utf8") as f:
f.write(new_data)
else:
logger.warning("Can't find moz.build file to update")