atr/tasks/checks/signature.py (100 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import logging
import tempfile
from typing import Any, Final
import gnupg
import sqlmodel
import atr.db as db
import atr.db.models as models
import atr.tasks.checks as checks
_LOGGER: Final = logging.getLogger(__name__)
async def check(args: checks.FunctionArguments) -> str | None:
"""Check a signature file."""
recorder = await args.recorder()
if not (primary_abs_path := await recorder.abs_path()):
return None
if not (primary_rel_path := args.primary_rel_path):
await recorder.failure("Primary relative path is required", {"primary_rel_path": primary_rel_path})
return None
artifact_rel_path = primary_rel_path.removesuffix(".asc")
if not (artifact_abs_path := await recorder.abs_path(artifact_rel_path)):
return None
committee_name = args.extra_args.get("committee_name")
if not isinstance(committee_name, str):
await recorder.failure("Committee name is required", {"committee_name": committee_name})
return None
_LOGGER.info(
f"Checking signature {primary_abs_path} for {artifact_abs_path}"
f" using {committee_name} keys (rel: {primary_rel_path})"
)
try:
result_data = await _check_core_logic(
committee_name=committee_name,
artifact_path=str(artifact_abs_path),
signature_path=str(primary_abs_path),
)
if result_data.get("error"):
await recorder.failure(result_data["error"], result_data)
elif result_data.get("verified"):
await recorder.success("Signature verified successfully", result_data)
else:
# Shouldn't happen
await recorder.failure("Signature verification failed for unknown reasons", result_data)
except Exception as e:
await recorder.failure("Error during signature check execution", {"error": str(e)})
return None
async def _check_core_logic(committee_name: str, artifact_path: str, signature_path: str) -> dict[str, Any]:
"""Verify a signature file using the committee's public signing keys."""
_LOGGER.info(f"Attempting to fetch keys for committee_name: '{committee_name}'")
async with db.session() as session:
statement = (
sqlmodel.select(models.PublicSigningKey)
.join(models.KeyLink)
.join(models.Committee)
.where(db.validate_instrumented_attribute(models.Committee.name) == committee_name)
)
result = await session.execute(statement)
public_keys = [key.ascii_armored_key for key in result.scalars().all()]
_LOGGER.info(f"Found {len(public_keys)} public keys for committee_name: '{committee_name}'")
return await asyncio.to_thread(
_check_core_logic_verify_signature,
signature_path=signature_path,
artifact_path=artifact_path,
ascii_armored_keys=public_keys,
)
def _check_core_logic_verify_signature(
signature_path: str, artifact_path: str, ascii_armored_keys: list[str]
) -> dict[str, Any]:
"""Verify a GPG signature for a file."""
with tempfile.TemporaryDirectory(prefix="gpg-") as gpg_dir, open(signature_path, "rb") as sig_file:
gpg: Final[gnupg.GPG] = gnupg.GPG(gnupghome=gpg_dir)
# Import all PMC public signing keys
for key in ascii_armored_keys:
import_result = gpg.import_keys(key)
if not import_result.fingerprints:
# TODO: Log warning about invalid key?
continue
verified = gpg.verify_file(sig_file, str(artifact_path))
# Collect all available information for debugging
debug_info = {
"key_id": verified.key_id or "Not available",
"fingerprint": verified.fingerprint.lower() if verified.fingerprint else "Not available",
"pubkey_fingerprint": verified.pubkey_fingerprint.lower() if verified.pubkey_fingerprint else "Not available",
"creation_date": verified.creation_date or "Not available",
"timestamp": verified.timestamp or "Not available",
"username": verified.username or "Not available",
"status": verified.status or "Not available",
"valid": bool(verified),
"trust_level": verified.trust_level if hasattr(verified, "trust_level") else "Not available",
"trust_text": verified.trust_text if hasattr(verified, "trust_text") else "Not available",
"stderr": verified.stderr if hasattr(verified, "stderr") else "Not available",
"num_committee_keys": len(ascii_armored_keys),
}
if not verified:
return {
"verified": False,
"error": "No valid signature found",
"debug_info": debug_info,
}
return {
"verified": True,
"key_id": verified.key_id,
"timestamp": verified.timestamp,
"username": verified.username or "Unknown",
"fingerprint": verified.pubkey_fingerprint.lower() or "Unknown",
"status": "Valid signature",
"debug_info": debug_info,
}