atr/tasks/checks/hashing.py (42 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import hashlib
import logging
import secrets
from typing import Final
import aiofiles
import atr.tasks.checks as checks
_LOGGER: Final = logging.getLogger(__name__)
async def check(args: checks.FunctionArguments) -> str | None:
"""Check the hash of a file."""
recorder = await args.recorder()
if not (hash_abs_path := await recorder.abs_path()):
return None
algorithm = hash_abs_path.suffix.lstrip(".")
if algorithm not in {"sha256", "sha512"}:
await recorder.failure("Unsupported hash algorithm", {"algorithm": algorithm})
return None
# Remove the hash file suffix to get the artifact path
# This replaces the last suffix, which is what we want
# >>> pathlib.Path("a/b/c.d.e.f.g").with_suffix(".x")
# PosixPath('a/b/c.d.e.f.x')
# >>> pathlib.Path("a/b/c.d.e.f.g").with_suffix("")
# PosixPath('a/b/c.d.e.f')
artifact_abs_path = hash_abs_path.with_suffix("")
_LOGGER.info(
f"Checking hash ({algorithm}) for {artifact_abs_path} against {hash_abs_path} (rel: {args.primary_rel_path})"
)
hash_func = hashlib.sha256 if algorithm == "sha256" else hashlib.sha512
hash_obj = hash_func()
try:
async with aiofiles.open(artifact_abs_path, mode="rb") as f:
while chunk := await f.read(4096):
hash_obj.update(chunk)
computed_hash = hash_obj.hexdigest()
async with aiofiles.open(hash_abs_path) as f:
expected_hash = await f.read()
# May be in the format "HASH FILENAME\n"
# TODO: Check the FILENAME part
expected_hash = expected_hash.strip().split()[0]
if secrets.compare_digest(computed_hash, expected_hash):
await recorder.success(
f"Hash ({algorithm}) matches expected value",
{"computed_hash": computed_hash, "expected_hash": expected_hash},
)
else:
await recorder.failure(
f"Hash ({algorithm}) mismatch",
{"computed_hash": computed_hash, "expected_hash": expected_hash},
)
except Exception as e:
await recorder.failure("Unable to verify hash", {"error": str(e)})
return None