atr/analysis.py (306 lines of code) (raw):

#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import dataclasses import os import pathlib import re import signal import sys from typing import Final ARTIFACT_SUFFIXES: Final[list[str]] = [ "bin", "crate", "deb", "dmg", "exe", "far", "gem", "jar.pack.gz", "jar", "msi", "nar", "nbm", "snupkg", "nupkg", "pkg", "pom", "rar", "rpm", "sh", "slingosgifeature", "taco", "tar.bz2", "tar.gz", "tar.xz", "tar", "tgz", "vsix", "war", "whl", "zip", ] # "mds" is used in ozone # "SHA256" and "SHA512" are used in ranger # "MD5" is used in samza # "asc.asc" is used in felix METADATA_SUFFIXES: Final[list[str]] = [ "asc.asc", "asc.md5", "asc.sha1", "asc.sha256", "asc.sha512", "sha512.asc", "sha512.md5", "sha512.sha1", "sha512.sha512", "asc", "MD5", "md5", "mds", "prov", "sh1", "sha1", "sha256", "SHA256", "SHA512", "sha512sum", "sha512", "sha", "sig", ] # .license is used in high volume in netbeans SKIPPABLE_SUFFIXES: Final[list[str]] = [ ".bak", ".css", ".gif", ".html", ".json", ".license", ".md", ".pdf", ".png", ".temp", ".tmp", ".txt", ".xml", ".yaml", ] # Should perhaps not include javadoc # app # doc # docs # example # markdown # nodeps # release # sdk # tests VARIANT_PATTERNS: Final[list[str]] = [ "binary-assembly", "binary", "bin", "dist", "install_[a-z][a-z](?:-[A-Z][A-Z])?", "javadoc", "langpack_[a-z][a-z](?:-[A-Z][A-Z])?", "lib-debug", "lib", "pkg", "source-release", "sources", "source", "src", ] @dataclasses.dataclass class Analysis: versions: dict[str, set[str]] subs: dict[str, set[str]] templates: dict[str, dict[str, int]] def architecture_pattern() -> str: architectures = [ "cp[0-9]+-cp[0-9]+m?-[a-z0-9_]+(?:[.]manylinux[a-z0-9_]+)*", "pp[0-9]+-pypy[0-9]+_pp[0-9]+-[a-z0-9_]+(?:[.]manylinux[a-z0-9_]+)?", "darwin(?:-unknown)?-(?:aarch64|amd64|arm64|64bit|arm64bit|x64)", "Linux-CentOS[0-9]+", "Linux-Ubuntu[0-9]+", "linux(?:-glibc|musl|unknown)?-(?:aarch64|amd64|arm64|64bit|arm64bit|x64)", "linux.gtk.x86_64", "mac(?:os|OS)?(?:-unknown)?-(?:aarch64|amd64|arm64|64bit|arm64bit|x64)", "macos.cocoa.x86_64", "osx(?:-unknown)?-(?:aarch64|amd64|arm64|64bit|arm64bit|x64)", "py2.py3-none-any", "py3-none-any", "win32.win32.x86_64", "windows(?:-unknown)?-(?:aarch64|amd64|arm64|64bit|arm64bit|x64)", "x86_64(?:-noavx2)?", "(?:x64|x86)-windows-staticaarch64", "amd64", "arm", "Darwin", "Linux_x86", "linux", "MacOS_x86-64", "macosx?", "noarch", "Win_x86", "win(?:dows)?", ] return "(" + "|".join(architectures) + ")(?=[_.-])" def component_parse(i: int, component: str, size: int, elements: dict[str, str | None]) -> None: if i == 0: # CORE # Never starts with "apache-" elements["core"] = component elif (i == 1) and (size == 2): elements["template"] = filename_parse(component, elements)[0] elif i == 1: # SUB or VERSION # TODO: Check total depth to give an indication of SUB? if is_version(component): elements["version"] = version_parse(component, elements) else: elements["sub"] = component elif (i == 2) and (size == 3): # CORE/VERSION/FILENAME elements["template"] = filename_parse(component, elements)[0] elif (i == 2) and (size == 4): # VERSION elements["version"] = version_parse(component, elements) elif (i == 3) and (size == 4): # CORE/VERSION/SUB/FILENAME elements["template"] = filename_parse(component, elements)[0] elif i == (size - 1): # FILENAME, but more deeply nested elements["template"] = filename_parse(component, elements)[0] # elements["missing"] += 1 def elements_update(elements: dict[str, str | None], core: str, analysis: Analysis) -> None: if core not in analysis.versions: analysis.versions[core] = set() if core not in analysis.subs: analysis.subs[core] = set() if core not in analysis.templates: analysis.templates[core] = {} if elements["version"] is not None: analysis.versions[core].add(elements["version"]) if elements["sub"] is not None: analysis.subs[core].add(elements["sub"]) if elements["template"] is not None: if elements["template"] not in analysis.templates[core]: analysis.templates[core][elements["template"]] = 0 analysis.templates[core][elements["template"]] += 1 def extension_pattern() -> str: # https://tableau.github.io/connector-plugin-sdk/docs/ # https://en.wikipedia.org/wiki/WAR_(file_format) # https://learn.microsoft.com/en-us/visualstudio/extensibility/anatomy-of-a-vsix-package?view=vs-2022 # What's the status of "pom"? # We've included "sh", so perhaps we should include "patch" metadata_artifact_pattern = ( r"(?P<metadata_artifact>" + "|".join(["[.]" + re.escape(a) for a in ARTIFACT_SUFFIXES]) + r")" ) metadata_pattern = r"(?P<metadata>" + "|".join(["[.]" + re.escape(m) for m in METADATA_SUFFIXES]) + r")" artifact_pattern = r"(?P<artifact>" + "|".join(["[.]" + re.escape(a) for a in ARTIFACT_SUFFIXES]) + r")" pattern = rf"((?:{metadata_artifact_pattern}{metadata_pattern})|{artifact_pattern})$" return pattern def filename_parse(filename: str, elements: dict[str, str | None]) -> tuple[str, dict[str, list[str]]]: substitutions: dict[str, list[str]] = { "sub": [], "core": [], "version": [], "variant": [], "tag": [], "arch": [], "ext": [], "label": [], } def sub(pattern: str, name: str, replacement: str, filename: str) -> str: matches = re.findall(pattern, filename) if matches: substitutions[name] = matches if isinstance(matches[0], str) else [m[0] for m in matches] else: substitutions[name] = [] return re.sub(pattern, replacement, filename) filename = sub(r"apache(?=[_.-])", "core", "α", filename) # TODO: -incubating # There is no standard position for -incubating if elements["sub"]: # Replace SUB before CORE because CORE may contain SUB filename = sub(elements["sub"] + r"(?=[_.-])", "sub", "σ", filename) if elements["core"]: filename = sub(elements["core"] + r"(?=[_.-])", "core", "κ", filename) if elements["version"]: filename = sub(elements["version"] + r"(?=[_.-])", "version", "β", filename) filename = sub(variant_pattern(), "variant", "ρ", filename) filename = sub(r"[0-9]+[.][0-9]+(?:[.][0-9]+(?:[.][0-9]+)?)?(?=[_.-])", "tag", "τ", filename) filename = sub(architecture_pattern(), "arch", "ι", filename) filename = sub(extension_pattern(), "ext", ".ε", filename) if "LABEL_MODE" in os.environ: filename = sub(r"(?<=-)[a-z]+[0-9]*(?:-[a-z]+[0-9]*)*(?=-)", "label", "λ", filename) filename = filename.replace("α", "ASF") filename = filename.replace("σ", "SUB") filename = filename.replace("β", "VERSION") filename = filename.replace("κ", "CORE") filename = filename.replace("ρ", "VARIANT") filename = filename.replace("τ", "TAG") filename = filename.replace("ι", "ARCH") filename = filename.replace("ε", "EXT") if "LABEL_MODE" in os.environ: filename = filename.replace("λ", "LABEL") return filename, substitutions def is_artifact(file_path: str | pathlib.Path) -> bool: """Check whether a file path represents a release artifact based on its extension.""" filename = str(file_path) search = re.search(extension_pattern(), filename) return bool(search and search.group("artifact")) def is_skippable(path: pathlib.Path) -> bool: if len(path.parts) < 2: return True if path.parts[0] == "META": return True # "KEYS", "LICENSE", "NOTICE", "README"... if "." not in path.name: return True if path.name in {".htaccess"}: return True for suffix in SKIPPABLE_SUFFIXES: if suffix in path.suffixes: return True return False def is_version(component: str) -> bool: return component[:1].isdigit() or ("." in component) def main() -> None: if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} FILENAME") sys.exit(1) filename = sys.argv[1] path_lines = [] with open(filename) as f: for line in f: path_lines.append(line.strip()) return perform_and_print(path_lines) def perform(path_lines: list[str]) -> Analysis: """Perform the analysis.""" paths = [] for line in path_lines: path = pathlib.Path(line.strip()) if is_skippable(path): continue paths.append(path) analysis = Analysis( versions={}, subs={}, templates={}, ) for path in paths: size = len(path.parts) elements: dict[str, str | None] = { "core": None, "version": None, "sub": None, "template": None, "substitutions": None, } for i, component in enumerate(path.parts): component_parse(i, component, size, elements) if elements["core"] is not None: elements_update(elements, elements["core"], analysis) return analysis def perform_and_print(path_lines: list[str]) -> None: """Perform the analysis and print the results.""" analysis = perform(path_lines) # Prevent BrokenPipeError when piping output to other commands signal.signal(signal.SIGPIPE, signal.SIG_DFL) try: print_data(analysis) except BrokenPipeError: ... def print_data(analysis: Analysis) -> None: # Print the templates of all projects for core, version_set in sorted(analysis.versions.items()): print("---", core, "---") print() if version_set: print(" VERSIONS:", ", ".join(sorted(version_set))) if analysis.subs[core]: print(" SUBS:", ", ".join(sorted(analysis.subs[core]))) print() for template, count in sorted(analysis.templates[core].items()): print(f" {count:3d} {template}") print() print() sys.stdout.flush() def substitutions_format(substitutions: dict[str, list[str]]) -> str: subs = [] for key, values in substitutions.items(): if values: subs.append(f"{key.upper()}: {', '.join(values)}") return ", ".join(subs) def variant_pattern() -> str: # .bin can also be an EXT # For example in opennlp # Which is why we do (?<=[_-]) return "(?<=[_-])(" + "|".join(VARIANT_PATTERNS) + ")(?=[_.-])" def version_parse(version: str, elements: dict[str, str | None]) -> str: if elements["core"] is None: return version if version.startswith(elements["core"] + "-"): return version[len(elements["core"]) + 1 :] return version if __name__ == "__main__": main()