atr/tasks/checks/license.py (331 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import hashlib
import logging
import os
import re
import tarfile
from typing import Any, Final
import atr.tasks.checks as checks
import atr.tasks.checks.targz as targz
_LOGGER: Final = logging.getLogger(__name__)
# Constant that must be present in the Apache License header
APACHE_LICENSE_HEADER: Final[bytes] = b"""\
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License."""
# File type comment style definitions
# Ordered by their popularity in the Stack Overflow Developer Survey 2024
COMMENT_STYLES: Final[dict[str, dict[str, str]]] = {
# JavaScript and variants
"js": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"mjs": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"cjs": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"jsx": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Python
"py": {"single": "#", "multi_start": '"""', "multi_end": '"""'},
# SQL
"sql": {"single": "--", "multi_start": "/*", "multi_end": "*/"},
"ddl": {"single": "--", "multi_start": "/*", "multi_end": "*/"},
"dml": {"single": "--", "multi_start": "/*", "multi_end": "*/"},
# TypeScript and variants
"ts": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"tsx": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"mts": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"cts": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Shell scripts
"sh": {"single": "#"},
"bash": {"single": "#"},
"zsh": {"single": "#"},
"ksh": {"single": "#"},
# Java
"java": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"jav": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# C#
"cs": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"csx": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# C++
"cpp": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"cxx": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"cc": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"hpp": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# C
"c": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"h": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# PHP
"php": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"phtml": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# PowerShell
"ps1": {"single": "#", "multi_start": "<#", "multi_end": "#>"},
"psm1": {"single": "#", "multi_start": "<#", "multi_end": "#>"},
"psd1": {"single": "#", "multi_start": "<#", "multi_end": "#>"},
# Go
"go": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Rust
"rs": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Kotlin
"kt": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"kts": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Lua
"lua": {"single": "--", "multi_start": "--[[", "multi_end": "]]"},
# Dart
"dart": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Assembly
"asm": {"single": ";"},
"s": {"single": "#"},
"S": {"single": "#"},
# Ruby
"rb": {"single": "#", "multi_start": "=begin", "multi_end": "=end"},
"rbw": {"single": "#", "multi_start": "=begin", "multi_end": "=end"},
# Swift
"swift": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# R
"r": {"single": "#"},
"R": {"single": "#"},
# Visual Basic
"vb": {"single": "'", "multi_start": "/*", "multi_end": "*/"},
"vbs": {"single": "'", "multi_start": "/*", "multi_end": "*/"},
# MATLAB
"m": {"single": "%", "multi_start": "%{", "multi_end": "%}"},
# VBA
"vba": {"single": "'"},
# Groovy
"groovy": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"gvy": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"gy": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"gsh": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Scala
"scala": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"sc": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
# Perl
"pl": {"single": "#", "multi_start": "=pod", "multi_end": "=cut"},
"pm": {"single": "#", "multi_start": "=pod", "multi_end": "=cut"},
"t": {"single": "#", "multi_start": "=pod", "multi_end": "=cut"},
}
# Patterns for files to include in license header checks
# Ordered by their popularity in the Stack Overflow Developer Survey 2024
INCLUDED_PATTERNS: Final[list[str]] = [
r"\.(js|mjs|cjs|jsx)$", # JavaScript
r"\.py$", # Python
r"\.(sql|ddl|dml)$", # SQL
r"\.(ts|tsx|mts|cts)$", # TypeScript
r"\.(sh|bash|zsh|ksh)$", # Shell
r"\.(java|jav)$", # Java
r"\.(cs|csx)$", # C#
r"\.(cpp|cxx|cc|c\+\+|hpp)$", # C++
r"\.(c|h)$", # C
r"\.(php|php[3-9]|phtml)$", # PHP
r"\.(ps1|psm1|psd1)$", # PowerShell
r"\.go$", # Go
r"\.rs$", # Rust
r"\.(kt|kts)$", # Kotlin
r"\.(lua)$", # Lua
r"\.dart$", # Dart
r"\.(asm|s|S)$", # Assembly
r"\.(rb|rbw)$", # Ruby
r"\.swift$", # Swift
r"\.(r|R)$", # R
r"\.(vb|vbs)$", # Visual Basic
r"\.m$", # MATLAB
r"\.vba$", # VBA
r"\.(groovy|gvy|gy|gsh)$", # Groovy
r"\.(scala|sc)$", # Scala
r"\.(pl|pm|t)$", # Perl
]
# Tasks
async def files(args: checks.FunctionArguments) -> str | None:
"""Check that the LICENSE and NOTICE files exist and are valid."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
_LOGGER.info(f"Checking license files for {artifact_abs_path} (rel: {args.primary_rel_path})")
try:
result_data = await asyncio.to_thread(_files_check_core_logic, str(artifact_abs_path))
if result_data.get("warning"):
await recorder.warning(result_data["warning"], result_data)
elif result_data.get("error"):
await recorder.failure(result_data["error"], result_data)
elif result_data["license_valid"] and result_data["notice_valid"]:
await recorder.success("LICENSE and NOTICE files present and valid", result_data)
else:
# TODO: Be more specific about the issues
await recorder.failure("Issues found with LICENSE or NOTICE files", result_data)
except Exception as e:
await recorder.failure("Error checking license files", {"error": str(e)})
return None
async def headers(args: checks.FunctionArguments) -> str | None:
"""Check that all source files have valid license headers."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
_LOGGER.info(f"Checking license headers for {artifact_abs_path} (rel: {args.primary_rel_path})")
try:
result_data = await asyncio.to_thread(_headers_check_core_logic, str(artifact_abs_path))
if result_data.get("warning_message"):
await recorder.warning(result_data["warning_message"], result_data)
elif result_data.get("error_message"):
# Handle errors during the check process itself
await recorder.failure(result_data["error_message"], result_data)
elif not result_data["valid"]:
# Handle validation failures
await recorder.failure(result_data["message"], result_data)
else:
# Handle success
await recorder.success(result_data["message"], result_data)
except Exception as e:
await recorder.failure("Error checking license headers", {"error": str(e)})
return None
def strip_comments(content: bytes, file_ext: str) -> bytes:
"""Strip comment prefixes from the content based on the file extension."""
if file_ext not in COMMENT_STYLES:
return content
comment_style = COMMENT_STYLES[file_ext]
lines = content.split(b"\n")
cleaned_lines = []
# Get comment markers as bytes
multi_start = comment_style.get("multi_start", "").encode()
multi_end = comment_style.get("multi_end", "").encode()
single = comment_style.get("single", "").encode()
# State tracking
in_multiline = False
is_c_style = (multi_start == b"/*") and (multi_end == b"*/")
for line in lines:
line = line.strip()
# Handle start of multi-line comment
if not in_multiline and multi_start and multi_start in line:
# Get content after multi-start
line = line[line.find(multi_start) + len(multi_start) :].strip()
in_multiline = True
# Handle end of multi-line comment
elif in_multiline and multi_end and multi_end in line:
# Get content before multi-end
line = line[: line.find(multi_end)].strip()
in_multiline = False
# Handle single-line comments
elif not in_multiline and single and line.startswith(single):
line = line[len(single) :].strip()
# For C style comments, strip leading asterisk if present
elif is_c_style and in_multiline and line.startswith(b"*"):
line = line[1:].strip()
# Only add non-empty lines
if line:
cleaned_lines.append(line)
return b"\n".join(cleaned_lines)
# File helpers
def _files_check_core_logic(artifact_path: str) -> dict[str, Any]:
"""Verify that LICENSE and NOTICE files exist and are placed and formatted correctly."""
files_found = []
license_ok = False
notice_ok = False
notice_issues: list[str] = []
# First find and validate the root directory
try:
root_dir = targz.root_directory(artifact_path)
except ValueError as e:
return {
"files_checked": ["LICENSE", "NOTICE"],
"files_found": [],
"license_valid": False,
"notice_valid": False,
"warning": f"Could not determine root directory: {e!s}",
}
# Check for license files in the root directory
with tarfile.open(artifact_path, mode="r|gz") as tf:
for member in tf:
if member.name and member.name.split("/")[-1].startswith("._"):
# Metadata convention
continue
if member.name in [f"{root_dir}/LICENSE", f"{root_dir}/NOTICE"]:
filename = os.path.basename(member.name)
files_found.append(filename)
if filename == "LICENSE":
# TODO: Check length, should be 11,358 bytes
license_ok = _files_check_core_logic_license(tf, member)
elif filename == "NOTICE":
# TODO: Check length doesn't exceed some preset
notice_ok, notice_issues = _files_check_core_logic_notice(tf, member)
messages = _files_messages_build(root_dir, files_found, license_ok, notice_ok, notice_issues)
return {
"files_checked": ["LICENSE", "NOTICE"],
"files_found": files_found,
"license_valid": license_ok,
"notice_valid": notice_ok,
"notice_issues": notice_issues if notice_issues else None,
"message": "; ".join(messages) if messages else "All license files present and valid",
}
def _files_check_core_logic_license(tf: tarfile.TarFile, member: tarfile.TarInfo) -> bool:
"""Verify that the LICENSE file matches the Apache 2.0 license."""
f = tf.extractfile(member)
if not f:
return False
sha3 = hashlib.sha3_256()
content = f.read()
sha3.update(content[:11358])
return sha3.hexdigest() == "8a0a8fb6c73ef27e4322391c7b28e5b38639e64e58c40a2c7a51cec6e7915a6a"
def _files_check_core_logic_notice(tf: tarfile.TarFile, member: tarfile.TarInfo) -> tuple[bool, list[str]]:
"""Verify that the NOTICE file follows the required format."""
f = tf.extractfile(member)
if not f:
return False, ["Could not read NOTICE file"]
content = f.read().decode("utf-8")
issues = []
if not re.search(r"Apache\s+[\w\-\.]+", content, re.MULTILINE):
issues.append("Missing or invalid Apache product header")
if not re.search(r"Copyright\s+(?:\d{4}|\d{4}-\d{4})\s+The Apache Software Foundation", content, re.MULTILINE):
issues.append("Missing or invalid copyright statement")
if not re.search(
r"This product includes software developed at\s*\nThe Apache Software Foundation \(.*?\)", content, re.DOTALL
):
issues.append("Missing or invalid foundation attribution")
return len(issues) == 0, issues
def _files_messages_build(
root_dir: str,
files_found: list[str],
license_ok: bool,
notice_ok: bool,
notice_issues: list[str],
) -> list[str]:
"""Build status messages for license file verification."""
messages = []
if not files_found:
messages.append(f"No LICENSE or NOTICE files found in root directory '{root_dir}'")
else:
if "LICENSE" not in files_found:
messages.append(f"LICENSE file not found in root directory '{root_dir}'")
elif not license_ok:
messages.append("LICENSE file does not match Apache 2.0 license")
if "NOTICE" not in files_found:
messages.append(f"NOTICE file not found in root directory '{root_dir}'")
elif not notice_ok:
messages.append("NOTICE file format issues: " + "; ".join(notice_issues))
return messages
# Header helpers
def _get_file_extension(filename: str) -> str | None:
"""Get the file extension without the dot."""
_, ext = os.path.splitext(filename)
if not ext:
return None
return ext[1:].lower()
def _headers_check_core_logic(artifact_path: str) -> dict[str, Any]:
"""Verify Apache License headers in source files within an archive."""
# We could modify @Lucas-C/pre-commit-hooks instead for this
# But hopefully this will be robust enough, at least for testing
files_checked = 0
files_with_valid_headers = 0
errors = []
# First find and validate the root directory
try:
root_dir = targz.root_directory(artifact_path)
except ValueError as e:
return {
"files_checked": 0,
"files_with_valid_headers": 0,
"errors": [],
"error_message": None,
"warning_message": f"Could not determine root directory: {e!s}",
"valid": False,
}
# Check files in the archive
with tarfile.open(artifact_path, mode="r|gz") as tf:
for member in tf:
if member.name and member.name.split("/")[-1].startswith("._"):
# Metadata convention
continue
processed, result = _headers_check_core_logic_process_file(tf, member, root_dir)
if not processed:
continue
files_checked += 1
if result.get("error"):
errors.append(result["error"])
elif result.get("valid"):
files_with_valid_headers += 1
else:
# Should be impossible
raise RuntimeError("Logic error")
# Prepare result message
if files_checked == 0:
message = "No source files found to check for license headers"
# No files to check is not a failure
valid = True
else:
# Require all files to have valid headers
valid = files_checked == files_with_valid_headers
message = f"Checked {files_checked} files, found {files_with_valid_headers} with valid headers"
return {
"files_checked": files_checked,
"files_with_valid_headers": files_with_valid_headers,
"errors": errors,
"message": message,
"valid": valid,
}
def _headers_check_core_logic_process_file(
tf: tarfile.TarFile,
member: tarfile.TarInfo,
root_dir: str,
) -> tuple[bool, dict[str, Any]]:
"""Process a single file in an archive for license header verification."""
if not member.isfile():
return False, {}
# Check if we should verify this file, based on extension
if not _headers_check_core_logic_should_check(member.name):
return False, {}
# Get relative path for display purposes only
display_path = member.name
if display_path.startswith(f"{root_dir}/"):
display_path = display_path[len(root_dir) + 1 :]
# Extract and check the file
try:
f = tf.extractfile(member)
if f is None:
return True, {"error": f"Could not read file: {display_path}"}
# Allow for some extra content at the start of the file
# That may be shebangs, encoding declarations, etc.
content = f.read(len(APACHE_LICENSE_HEADER) + 512)
is_valid, error = _headers_validate(content, member.name)
if is_valid:
return True, {"valid": True}
else:
return True, {"valid": False, "error": f"{display_path}: {error}"}
except Exception as e:
return True, {"error": f"Error processing {display_path}: {e!s}"}
def _headers_check_core_logic_should_check(filepath: str) -> bool:
"""Determine if a file should be checked for license headers."""
ext = _get_file_extension(filepath)
if ext is None:
return False
# First check if we have comment style definitions for this extension
if ext not in COMMENT_STYLES:
return False
# Then check if the file matches any of our included patterns
for pattern in INCLUDED_PATTERNS:
if re.search(pattern, filepath, re.IGNORECASE):
return True
return False
def _headers_validate(content: bytes, filename: str) -> tuple[bool, str | None]:
"""Validate that the content contains the Apache License header after removing comments."""
# Get the file extension from the filename
file_ext = _get_file_extension(filename)
if not file_ext or file_ext not in COMMENT_STYLES:
return False, "Could not determine file type from extension"
# Strip comments, removing empty lines in the process
cleaned_header = strip_comments(content, file_ext)
# Normalise the expected header in the same way as directly above
expected_lines = [line.strip() for line in APACHE_LICENSE_HEADER.split(b"\n")]
expected_lines = [line for line in expected_lines if line]
expected_header = b"\n".join(expected_lines)
# Check if the cleaned header contains the expected text
if expected_header not in cleaned_header:
# # Find the first difference for debugging
# cleaned_lines = cleaned_header.split(b"\n")
# expected_lines = expected_header.split(b"\n")
# for i, (c, e) in enumerate(zip(cleaned_lines, expected_lines)):
# if c != e:
# _LOGGER.debug("\nFirst difference at line %d:", i + 1)
# _LOGGER.debug("Expected: '%s'", e.decode(errors="replace"))
# _LOGGER.debug("Got: '%s'", c.decode(errors="replace"))
# break
return False, "License header does not match the required Apache License header text"
return True, None